Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions packages/cli/src/cli/commands/dispatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,50 @@ export function registerDispatchCommands(program: Command, defaultActor: string)
),
);

addWorkspaceOption(
dispatchCmd
.command('retry <runId>')
.description('Retry a failed run by creating a new run attempt')
.option('-a, --actor <name>', 'Actor', defaultActor)
.option('--adapter <name>', 'Adapter override for retry')
.option('--objective <text>', 'Objective override for retry')
.option('--no-execute', 'Create retry run but do not execute immediately')
.option('--agents <actors>', 'Comma-separated agent identities')
.option('--max-steps <n>', 'Maximum scheduler steps', '200')
.option('--step-delay-ms <ms>', 'Delay between scheduling steps', '25')
.option('--space <spaceRef>', 'Restrict execution to one space')
.option('--timeout-ms <ms>', 'Execution timeout in milliseconds')
.option('--dispatch-mode <mode>', 'direct|self-assembly')
.option('--self-assembly-agent <agent>', 'Agent identity for self-assembly dispatch mode')
.option('--json', 'Emit structured JSON output'),
).action((runId, opts) =>
runCommand(
opts,
async () => {
const workspacePath = resolveWorkspacePath(opts);
return {
run: await workgraph.dispatch.retryRun(workspacePath, runId, {
actor: opts.actor,
adapter: opts.adapter,
objective: opts.objective,
execute: opts.execute,
agents: csv(opts.agents),
maxSteps: Number.parseInt(String(opts.maxSteps), 10),
stepDelayMs: Number.parseInt(String(opts.stepDelayMs), 10),
space: opts.space,
timeoutMs: opts.timeoutMs ? Number.parseInt(String(opts.timeoutMs), 10) : undefined,
dispatchMode: opts.dispatchMode,
selfAssemblyAgent: opts.selfAssemblyAgent,
}),
};
},
(result) => [
`Retried run: ${result.run.id} [${result.run.status}]`,
...(result.run.context?.retry_of_run_id ? [`Source run: ${String(result.run.context.retry_of_run_id)}`] : []),
],
),
);

addWorkspaceOption(
dispatchCmd
.command('followup <runId> <input>')
Expand Down
94 changes: 84 additions & 10 deletions packages/cli/src/cli/commands/trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,53 @@ export function registerTriggerCommands(program: Command, defaultActor: string):
.option('-a, --actor <name>', 'Actor', defaultActor)
.option('--event-key <key>', 'Deterministic event key for idempotency')
.option('--objective <text>', 'Override run objective')
.option('--adapter <name>', 'Adapter override for dispatched run')
.option('--execute', 'Execute the triggered run immediately')
.option('--retry-failed', 'Retry failed run when idempotency resolves to failed status')
.option('--agents <actors>', 'Comma-separated agent identities for execution')
.option('--max-steps <n>', 'Maximum scheduler steps for execution')
.option('--step-delay-ms <ms>', 'Delay between scheduling steps for execution')
.option('--space <spaceRef>', 'Restrict execution to one space')
.option('--timeout-ms <ms>', 'Execution timeout in milliseconds')
.option('--dispatch-mode <mode>', 'direct|self-assembly')
.option('--self-assembly-agent <agent>', 'Agent identity for self-assembly dispatch mode')
.option('--json', 'Emit structured JSON output'),
).action((triggerPath, opts) =>
runCommand(
opts,
() => {
async () => {
const workspacePath = resolveWorkspacePath(opts);
if (opts.execute) {
return workgraph.trigger.fireTriggerAndExecute(workspacePath, triggerPath, {
actor: opts.actor,
eventKey: opts.eventKey,
objective: opts.objective,
adapter: opts.adapter,
retryFailed: Boolean(opts.retryFailed),
executeInput: {
agents: opts.agents ? String(opts.agents).split(',').map((entry: string) => entry.trim()).filter(Boolean) : undefined,
maxSteps: opts.maxSteps ? Number.parseInt(String(opts.maxSteps), 10) : undefined,
stepDelayMs: opts.stepDelayMs ? Number.parseInt(String(opts.stepDelayMs), 10) : undefined,
space: opts.space,
timeoutMs: opts.timeoutMs ? Number.parseInt(String(opts.timeoutMs), 10) : undefined,
dispatchMode: opts.dispatchMode,
selfAssemblyAgent: opts.selfAssemblyAgent,
},
});
}
return workgraph.trigger.fireTrigger(workspacePath, triggerPath, {
actor: opts.actor,
eventKey: opts.eventKey,
objective: opts.objective,
adapter: opts.adapter,
});
},
(result) => [
...(() => {
const executedResult = result as { executed?: boolean; retriedFromRunId?: string };
if (!executedResult.executed) return [];
return [`Executed: yes${executedResult.retriedFromRunId ? ` (retried from ${executedResult.retriedFromRunId})` : ''}`];
})(),
`Fired trigger: ${result.triggerPath}`,
`Run: ${result.run.id} [${result.run.status}]`,
],
Expand All @@ -46,24 +80,64 @@ export function registerTriggerCommands(program: Command, defaultActor: string):
.command('run')
.description('Process one trigger-engine cycle')
.option('-a, --actor <name>', 'Actor', defaultActor)
.option('--execute-runs', 'Execute dispatch-run actions as full run->evidence loop')
.option('--retry-failed-runs', 'Retry failed runs when dispatch-run hits failed idempotent runs')
.option('--agents <actors>', 'Comma-separated agent identities for execution')
.option('--max-steps <n>', 'Maximum scheduler steps for execution')
.option('--step-delay-ms <ms>', 'Delay between scheduling steps for execution')
.option('--space <spaceRef>', 'Restrict execution to one space')
.option('--timeout-ms <ms>', 'Execution timeout in milliseconds')
.option('--dispatch-mode <mode>', 'direct|self-assembly')
.option('--self-assembly-agent <agent>', 'Agent identity for self-assembly dispatch mode')
.option('--json', 'Emit structured JSON output'),
).action((opts) =>
runCommand(
opts,
() => {
async () => {
const workspacePath = resolveWorkspacePath(opts);
if (opts.executeRuns) {
return workgraph.triggerEngine.runTriggerRunEvidenceLoop(workspacePath, {
actor: opts.actor,
retryFailedRuns: Boolean(opts.retryFailedRuns),
execution: {
agents: opts.agents ? String(opts.agents).split(',').map((entry: string) => entry.trim()).filter(Boolean) : undefined,
maxSteps: opts.maxSteps ? Number.parseInt(String(opts.maxSteps), 10) : undefined,
stepDelayMs: opts.stepDelayMs ? Number.parseInt(String(opts.stepDelayMs), 10) : undefined,
space: opts.space,
timeoutMs: opts.timeoutMs ? Number.parseInt(String(opts.timeoutMs), 10) : undefined,
dispatchMode: opts.dispatchMode,
selfAssemblyAgent: opts.selfAssemblyAgent,
},
});
}
return workgraph.triggerEngine.runTriggerEngineCycle(workspacePath, {
actor: opts.actor,
});
},
(result) => [
`Evaluated: ${result.evaluated} triggers`,
`Fired: ${result.fired}`,
`Errors: ${result.errors}`,
...result.triggers.map((t) =>
` ${t.triggerPath}: ${t.fired ? 'FIRED' : 'skipped'} (${t.reason})${t.error ? ` error: ${t.error}` : ''}`,
),
],
(result) => {
if ('cycle' in result) {
return [
`Evaluated: ${result.cycle.evaluated} triggers`,
`Fired: ${result.cycle.fired}`,
`Errors: ${result.cycle.errors}`,
`Executed runs: ${result.executedRuns.length} (succeeded=${result.succeeded}, failed=${result.failed}, cancelled=${result.cancelled}, skipped=${result.skipped})`,
...result.cycle.triggers.map((t) =>
` ${t.triggerPath}: ${t.fired ? 'FIRED' : 'skipped'} (${t.reason})${t.error ? ` error: ${t.error}` : ''}`,
),
...result.executedRuns.map((run) =>
` run ${run.runId}: ${run.status}${run.retriedFromRunId ? ` (retried from ${run.retriedFromRunId})` : ''}${run.error ? ` error: ${run.error}` : ''}`,
),
];
}
return [
`Evaluated: ${result.evaluated} triggers`,
`Fired: ${result.fired}`,
`Errors: ${result.errors}`,
...result.triggers.map((t) =>
` ${t.triggerPath}: ${t.fired ? 'FIRED' : 'skipped'} (${t.reason})${t.error ? ` error: ${t.error}` : ''}`,
),
];
},
),
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ exports[`schema drift regression > locks CLI option signatures for critical comm
"-a, --actor <name>",
"--event-key <key>",
"--objective <text>",
"--adapter <name>",
"--execute",
"--retry-failed",
"--agents <actors>",
"--max-steps <n>",
"--step-delay-ms <ms>",
"--space <spaceRef>",
"--timeout-ms <ms>",
"--dispatch-mode <mode>",
"--self-assembly-agent <agent>",
"--json",
"-w, --workspace <path>",
"--vault <path>",
Expand Down
159 changes: 159 additions & 0 deletions packages/kernel/src/dispatch-evidence-loop.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import fs from 'node:fs';
import os from 'node:os';
import path from 'node:path';
import { spawnSync } from 'node:child_process';
import { loadRegistry, saveRegistry } from './registry.js';
import {
auditTrail,
createRun,
executeRun,
listRunEvidence,
retryRun,
} from './dispatch.js';
import { registerDispatchAdapter } from './runtime-adapter-registry.js';
import type { DispatchAdapter, DispatchAdapterExecutionInput, DispatchAdapterExecutionResult } from './runtime-adapter-contracts.js';

let workspacePath: string;
let gitAvailable = false;

beforeEach(() => {
workspacePath = fs.mkdtempSync(path.join(os.tmpdir(), 'wg-dispatch-evidence-'));
const registry = loadRegistry(workspacePath);
saveRegistry(workspacePath, registry);
const gitInit = spawnSync('git', ['init'], {
cwd: workspacePath,
stdio: 'ignore',
});
gitAvailable = (gitInit.status ?? 1) === 0;
});

afterEach(() => {
fs.rmSync(workspacePath, { recursive: true, force: true });
});

describe('dispatch run evidence loop', () => {
it('captures immutable audit trail and execution evidence', async () => {
const command = `"${process.execPath}" -e "const fs=require('fs'); fs.mkdirSync('artifacts',{recursive:true}); fs.writeFileSync('artifacts/evidence.txt','ok'); console.log('tests: 3 passed, 0 failed'); console.log('proof artifacts/evidence.txt'); console.log('https://github.com/versatly/workgraph/pull/4242');"`;
const run = createRun(workspacePath, {
actor: 'agent-evidence',
adapter: 'shell-worker',
objective: 'Collect execution evidence',
context: {
shell_command: command,
},
});

const executed = await executeRun(workspacePath, run.id, {
actor: 'agent-evidence',
timeoutMs: 10_000,
});

expect(executed.status).toBe('succeeded');
expect((executed.evidenceChain?.count ?? 0) > 0).toBe(true);
expect((executed.audit?.eventCount ?? 0) > 0).toBe(true);

const evidence = listRunEvidence(workspacePath, run.id);
const evidenceTypes = new Set(evidence.map((entry) => entry.type));
expect(evidenceTypes.has('stdout')).toBe(true);
expect(evidenceTypes.has('test-result')).toBe(true);
expect(evidenceTypes.has('pr-url')).toBe(true);
expect(evidenceTypes.has('attachment')).toBe(true);
if (gitAvailable) {
expect(evidenceTypes.has('file-change')).toBe(true);
}

const trail = auditTrail(workspacePath, run.id);
expect(trail.some((entry) => entry.kind === 'run-created')).toBe(true);
expect(trail.some((entry) => entry.kind === 'run-execution-started')).toBe(true);
expect(trail.some((entry) => entry.kind === 'run-evidence-collected')).toBe(true);
expect(trail.some((entry) => entry.kind === 'run-execution-finished')).toBe(true);
});

it('fails gracefully on execution timeout and records timeout audit event', async () => {
registerDispatchAdapter('test-timeout-adapter', () =>
makeAdapter(async () =>
new Promise<DispatchAdapterExecutionResult>(() => {
// Intentional never-resolving execution promise to trigger dispatcher timeout.
})),
);

const run = createRun(workspacePath, {
actor: 'agent-timeout',
adapter: 'test-timeout-adapter',
objective: 'Trigger timeout path',
});

const finished = await executeRun(workspacePath, run.id, {
actor: 'agent-timeout',
timeoutMs: 25,
});

expect(finished.status).toBe('failed');
expect(finished.error).toContain('timed out');
const trail = auditTrail(workspacePath, run.id);
expect(trail.some((entry) => entry.kind === 'run-execution-timeout')).toBe(true);
});

it('retries failed runs into a new attempt', async () => {
registerDispatchAdapter('test-retry-adapter', () =>
makeAdapter(async (input) => {
if (input.context?.retry_attempt) {
return {
status: 'succeeded',
output: 'retry succeeded',
logs: [],
};
}
return {
status: 'failed',
error: 'first attempt failed',
logs: [],
};
}),
);

const source = createRun(workspacePath, {
actor: 'agent-retry',
adapter: 'test-retry-adapter',
objective: 'Retry target',
});
const failed = await executeRun(workspacePath, source.id, { actor: 'agent-retry' });
expect(failed.status).toBe('failed');

const retried = await retryRun(workspacePath, source.id, {
actor: 'agent-retry',
});
expect(retried.id).not.toBe(source.id);
expect(retried.status).toBe('succeeded');
expect(retried.context?.retry_of_run_id).toBe(source.id);
expect(retried.context?.retry_attempt).toBe(1);

const sourceTrail = auditTrail(workspacePath, source.id);
expect(sourceTrail.some((entry) => entry.kind === 'run-retried')).toBe(true);
});
});

function makeAdapter(
executeImpl: (input: DispatchAdapterExecutionInput) => Promise<DispatchAdapterExecutionResult>,
): DispatchAdapter {
return {
name: 'test-adapter',
async create() {
return { runId: 'external-run', status: 'queued' };
},
async status(runId: string) {
return { runId, status: 'running' };
},
async followup(runId: string) {
return { runId, status: 'running' };
},
async stop(runId: string) {
return { runId, status: 'cancelled' };
},
async logs() {
return [];
},
execute: executeImpl,
};
}
Loading