diff --git a/README.md b/README.md index 0c206b8..780b63e 100644 --- a/README.md +++ b/README.md @@ -124,6 +124,7 @@ Key workspace packages: Migration notes: see `docs/MIGRATION.md`. Live workspace repair runbook: see `docs/INVARIANT_REPAIR_PLAYBOOK.md`. +Realtime control-api SSE contract: see `docs/SSE_EVENTS.md`. ### Development workflow (contributors) diff --git a/docs/SSE_EVENTS.md b/docs/SSE_EVENTS.md new file mode 100644 index 0000000..417eb41 --- /dev/null +++ b/docs/SSE_EVENTS.md @@ -0,0 +1,62 @@ +# WorkGraph Control API SSE stream + +`GET /api/events` provides a real-time Server-Sent Events stream for dashboard/runtime consumers. + +## Auth + +This endpoint is under `/api`, so it uses the same bearer-token middleware as the rest of the control API. + +## Event envelope + +Every `data:` payload uses the deterministic envelope: + +```json +{ + "id": "", + "type": "", + "path": "", + "actor": "", + "fields": { "...": "..." }, + "ts": "" +} +``` + +SSE framing includes: + +- `id: ` (for `Last-Event-ID` reconnect) +- `event: ` +- `data: ` + +## Filters + +Query params are optional and can be repeated or comma-separated: + +- `event` / `events`: filter by event type (for example `thread.created`, `run.updated`) +- `primitive` / `primitiveType`: filter by primitive type (for example `thread`, `conversation`, `plan-step`, `run`) +- `thread`: filter to one thread path/slug (`threads/foo.md`, `threads/foo`, or `foo`) + +## Reconnect + replay semantics + +The server honors the `Last-Event-ID` request header (or `lastEventId` query fallback): + +- Replay starts strictly **after** that exact event id. +- Unknown ids replay from the beginning (safe default for gap-free recovery). +- Replayed ordering is deterministic and matches ledger append order. + +## Ordering + idempotency contract + +- Event ids are deterministic per projected dashboard event (`#`). +- Event order is stable: + 1. Ledger append order across entries. + 2. Stable projection order within each entry. +- Clients should treat `id` as the idempotency key and dedupe on reconnect. + +## Keepalive + +The stream emits heartbeat comments periodically: + +```text +:keepalive +``` + +This keeps idle connections alive across proxies/load balancers. diff --git a/packages/control-api/src/server-events.test.ts b/packages/control-api/src/server-events.test.ts new file mode 100644 index 0000000..eb079cc --- /dev/null +++ b/packages/control-api/src/server-events.test.ts @@ -0,0 +1,169 @@ +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import fs from 'node:fs'; +import os from 'node:os'; +import path from 'node:path'; +import { + ledger as ledgerModule, + type LedgerEntry, +} from '@versatly/workgraph-kernel'; +import { + createDashboardEventFilter, + listDashboardEventsSince, + mapLedgerEntryToDashboardEvents, + toSsePayload, +} from './server-events.js'; + +const ledger = ledgerModule; + +let workspacePath: string; + +beforeEach(() => { + workspacePath = fs.mkdtempSync(path.join(os.tmpdir(), 'wg-server-events-')); +}); + +afterEach(() => { + fs.rmSync(workspacePath, { recursive: true, force: true }); +}); + +describe('server dashboard events', () => { + it('maps deterministic per-event ids and deterministic SSE envelope shape', () => { + const entry: LedgerEntry = { + ts: '2026-03-01T00:00:00.000Z', + actor: 'agent-a', + op: 'create', + target: 'threads/deterministic.md', + type: 'thread', + data: { + status: 'open', + }, + hash: 'hash-deterministic', + prevHash: 'GENESIS', + }; + + const events = mapLedgerEntryToDashboardEvents(entry); + expect(events.map((event) => event.id)).toEqual([ + 'hash-deterministic#thread.created', + 'hash-deterministic#primitive.changed', + 'hash-deterministic#ledger.appended', + ]); + + const payload = toSsePayload(events[0]); + const dataLine = payload.split('\n').find((line) => line.startsWith('data: ')); + expect(dataLine).toBeDefined(); + const envelope = JSON.parse(dataLine!.slice('data: '.length)) as Record; + expect(Object.keys(envelope)).toEqual(['id', 'type', 'path', 'actor', 'fields', 'ts']); + expect(envelope.id).toBe(events[0].id); + expect(envelope.type).toBe('thread.created'); + }); + + it('emits dedicated lifecycle events for conversation, plan-step, and run primitives', () => { + const conversationEvents = mapLedgerEntryToDashboardEvents({ + ts: '2026-03-01T00:00:00.000Z', + actor: 'agent-a', + op: 'update', + target: 'conversations/sync.md', + type: 'conversation', + hash: 'hash-conversation', + prevHash: 'GENESIS', + data: { + changed: ['status'], + }, + }); + expect(conversationEvents.map((event) => event.type)).toEqual([ + 'conversation.updated', + 'primitive.changed', + 'ledger.appended', + ]); + + const stepEvents = mapLedgerEntryToDashboardEvents({ + ts: '2026-03-01T00:00:01.000Z', + actor: 'agent-b', + op: 'create', + target: 'plan-steps/ship-api.md', + type: 'plan-step', + hash: 'hash-plan-step', + prevHash: 'hash-conversation', + data: { + status: 'open', + }, + }); + expect(stepEvents.map((event) => event.type)).toEqual([ + 'plan-step.updated', + 'primitive.changed', + 'ledger.appended', + ]); + + const runEvents = mapLedgerEntryToDashboardEvents({ + ts: '2026-03-01T00:00:02.000Z', + actor: 'agent-c', + op: 'update', + target: '.workgraph/runs/run_123', + type: 'run', + hash: 'hash-run', + prevHash: 'hash-plan-step', + data: { + status: 'running', + }, + }); + expect(runEvents.map((event) => event.type)).toEqual([ + 'run.updated', + 'primitive.changed', + 'ledger.appended', + ]); + }); + + it('replays from the exact event id, not only the ledger entry id', () => { + ledger.append(workspacePath, 'seed', 'create', 'threads/replay.md', 'thread'); + ledger.append(workspacePath, 'seed', 'claim', 'threads/replay.md', 'thread'); + + const allEvents = listDashboardEventsSince(workspacePath, undefined); + expect(allEvents.length).toBeGreaterThan(4); + const anchor = allEvents[1]; + + const replay = listDashboardEventsSince(workspacePath, anchor.id); + expect(replay.map((event) => event.id)).toEqual( + allEvents.slice(2).map((event) => event.id), + ); + + const unknownReplay = listDashboardEventsSince(workspacePath, 'unknown-id'); + expect(unknownReplay.map((event) => event.id)).toEqual( + allEvents.map((event) => event.id), + ); + }); + + it('filters by event type, primitive type, and thread path', () => { + ledger.append(workspacePath, 'seed', 'create', 'threads/alpha.md', 'thread'); + ledger.append(workspacePath, 'seed', 'update', '.workgraph/runs/run_1', 'run', { + status: 'running', + }); + ledger.append(workspacePath, 'seed', 'update', 'conversations/alpha.md', 'conversation', { + status: 'active', + }); + + const threadFilter = createDashboardEventFilter({ + threads: ['alpha'], + }); + const threadEvents = listDashboardEventsSince(workspacePath, undefined, threadFilter); + expect(threadEvents.length).toBeGreaterThan(0); + expect(threadEvents.every((event) => event.path === 'threads/alpha.md')).toBe(true); + + const runFilter = createDashboardEventFilter({ + primitiveTypes: ['run'], + }); + const runEvents = listDashboardEventsSince(workspacePath, undefined, runFilter); + expect(runEvents.length).toBeGreaterThan(0); + expect(runEvents.some((event) => event.type === 'run.updated')).toBe(true); + expect(runEvents.every((event) => event.type === 'run.updated' || event.fields.type === 'run')).toBe(true); + + const conversationEventTypeFilter = createDashboardEventFilter({ + eventTypes: ['conversation.updated'], + }); + const conversationLifecycleEvents = listDashboardEventsSince( + workspacePath, + undefined, + conversationEventTypeFilter, + ); + expect(conversationLifecycleEvents.length).toBe(1); + expect(conversationLifecycleEvents[0].type).toBe('conversation.updated'); + }); +}); diff --git a/packages/control-api/src/server-events.ts b/packages/control-api/src/server-events.ts index be092ce..47e274c 100644 --- a/packages/control-api/src/server-events.ts +++ b/packages/control-api/src/server-events.ts @@ -1,6 +1,7 @@ import { ledger as ledgerModule, type LedgerEntry, type LedgerOp } from '@versatly/workgraph-kernel'; const ledger = ledgerModule; +const EVENT_ID_DELIMITER = '#'; export type DashboardEventType = | 'thread.created' @@ -9,6 +10,9 @@ export type DashboardEventType = | 'thread.done' | 'thread.blocked' | 'thread.released' + | 'conversation.updated' + | 'plan-step.updated' + | 'run.updated' | 'ledger.appended' | 'primitive.changed'; @@ -21,70 +25,108 @@ export interface DashboardEvent { ts: string; } +export interface DashboardEventFilter { + eventTypes?: ReadonlySet; + primitiveTypes?: ReadonlySet; + threadPaths?: ReadonlySet; +} + +export interface CreateDashboardEventFilterInput { + eventTypes?: Iterable; + primitiveTypes?: Iterable; + threads?: Iterable; +} + +/** + * Deterministic event projection for dashboard consumers. + * + * Guarantees: + * - Event order follows ledger append order + stable projection order per ledger entry. + * - Event ids are deterministic and unique per projected event. + * - Replays are idempotent via `id` and `Last-Event-ID`. + */ export function mapLedgerEntryToDashboardEvents(entry: LedgerEntry): DashboardEvent[] { - const id = readEventId(entry); + const entryId = readEntryId(entry); const base = { - id, path: entry.target, actor: entry.actor, ts: entry.ts, }; - const events: DashboardEvent[] = []; + const projected: Array> = []; + const pushEvent = (type: DashboardEventType, fields: Record) => { + projected.push({ + ...base, + type, + fields, + }); + }; + if (entry.type === 'thread') { const threadEventType = toThreadEventType(entry.op); if (threadEventType) { - events.push({ - ...base, - type: threadEventType, - fields: deriveEventFields(entry), - }); + pushEvent(threadEventType, deriveEventFields(entry)); } } - if (shouldEmitPrimitiveChanged(entry)) { - events.push({ - ...base, - type: 'primitive.changed', - fields: { - op: entry.op, - type: entry.type, - ...sanitizeData(entry.data), - }, + const primitiveLifecycleType = toPrimitiveLifecycleEventType(entry); + if (primitiveLifecycleType) { + pushEvent(primitiveLifecycleType, { + op: entry.op, + type: entry.type, + ...sanitizeData(entry.data), }); } - events.push({ - ...base, - type: 'ledger.appended', - fields: { + if (shouldEmitPrimitiveChanged(entry)) { + pushEvent('primitive.changed', { op: entry.op, type: entry.type, ...sanitizeData(entry.data), - }, + }); + }; + + pushEvent('ledger.appended', { + op: entry.op, + type: entry.type, + ...sanitizeData(entry.data), }); - return events; + const slotByType = new Map(); + return projected.map((event) => { + const slot = slotByType.get(event.type) ?? 0; + slotByType.set(event.type, slot + 1); + const slotName = slot === 0 ? event.type : `${event.type}.${slot + 1}`; + return { + id: composeEventId(entryId, slotName), + ...event, + }; + }); } export function listDashboardEventsSince( workspacePath: string, lastEventId: string | undefined, + filter?: DashboardEventFilter, ): DashboardEvent[] { - const entries = ledger.readAll(workspacePath); - const startIdx = resolveReplayStartIndex(entries, lastEventId); - return entries - .slice(startIdx) + const allEvents = ledger + .readAll(workspacePath) .flatMap((entry) => mapLedgerEntryToDashboardEvents(entry)); + const startIdx = resolveReplayStartIndex(allEvents, lastEventId); + const replay = allEvents.slice(startIdx); + if (!filter) return replay; + return replay.filter((event) => matchesDashboardEventFilter(event, filter)); } export function subscribeToDashboardEvents( workspacePath: string, onEvent: (event: DashboardEvent) => void, + filter?: DashboardEventFilter, ): () => void { return ledger.subscribe(workspacePath, (entry) => { const events = mapLedgerEntryToDashboardEvents(entry); for (const event of events) { + if (!matchesDashboardEventFilter(event, filter)) continue; onEvent(event); } }); @@ -92,6 +134,7 @@ export function subscribeToDashboardEvents( export function toSsePayload(event: DashboardEvent): string { const body = JSON.stringify({ + id: event.id, type: event.type, path: event.path, actor: event.actor, @@ -154,6 +197,38 @@ function shouldEmitPrimitiveChanged(entry: LedgerEntry): boolean { return isPrimitiveMutationOp(entry.op); } +export function createDashboardEventFilter(input: CreateDashboardEventFilterInput): DashboardEventFilter | undefined { + const eventTypes = normalizeStringSet(input.eventTypes); + const primitiveTypes = normalizeStringSet(input.primitiveTypes); + const threadPaths = normalizeThreadPathSet(input.threads); + if (!eventTypes && !primitiveTypes && !threadPaths) return undefined; + return { + ...(eventTypes ? { eventTypes } : {}), + ...(primitiveTypes ? { primitiveTypes } : {}), + ...(threadPaths ? { threadPaths } : {}), + }; +} + +export function matchesDashboardEventFilter(event: DashboardEvent, filter: DashboardEventFilter | undefined): boolean { + if (!filter) return true; + if (filter.eventTypes && !filter.eventTypes.has(event.type.toLowerCase())) { + return false; + } + if (filter.primitiveTypes) { + const primitiveType = inferPrimitiveType(event)?.toLowerCase(); + if (!primitiveType || !filter.primitiveTypes.has(primitiveType)) { + return false; + } + } + if (filter.threadPaths) { + const eventThreadPath = normalizeThreadPath(event.path); + if (!eventThreadPath || !filter.threadPaths.has(eventThreadPath)) { + return false; + } + } + return true; +} + function isPrimitiveMutationOp(op: LedgerOp): boolean { return op === 'create' || op === 'update' || @@ -183,14 +258,28 @@ function toThreadEventType(op: LedgerOp): DashboardEventType | undefined { return undefined; } -function readEventId(entry: LedgerEntry): string { +function toPrimitiveLifecycleEventType(entry: LedgerEntry): DashboardEventType | undefined { + if (!entry.type || !isPrimitiveMutationOp(entry.op)) return undefined; + if (entry.type === 'conversation') return 'conversation.updated'; + if (entry.type === 'plan-step') return 'plan-step.updated'; + if (entry.type === 'run') return 'run.updated'; + return undefined; +} + +function readEntryId(entry: LedgerEntry): string { if (entry.hash) return entry.hash; return `${entry.ts}:${entry.actor}:${entry.op}:${entry.target}`; } -function resolveReplayStartIndex(entries: LedgerEntry[], lastEventId: string | undefined): number { +function composeEventId(entryId: string, slotName: string): string { + return `${entryId}${EVENT_ID_DELIMITER}${slotName}`; +} + +function resolveReplayStartIndex(events: DashboardEvent[], lastEventId: string | undefined): number { if (!lastEventId) return 0; - const idx = entries.findIndex((entry) => entry.hash === lastEventId); + const normalized = lastEventId.trim(); + if (!normalized) return 0; + const idx = events.findIndex((event) => event.id === normalized); if (idx < 0) return 0; return idx + 1; } @@ -199,3 +288,74 @@ function sanitizeData(data: Record | undefined): Record value !== undefined)); } + +function inferPrimitiveType(event: DashboardEvent): string | undefined { + if (event.type.startsWith('thread.')) return 'thread'; + const fromFields = readNonEmptyString(event.fields.type); + if (fromFields) return fromFields; + const fromPath = primitiveTypeFromPath(event.path); + if (fromPath) return fromPath; + return undefined; +} + +function primitiveTypeFromPath(rawPath: string): string | undefined { + const normalized = String(rawPath).replace(/\\/g, '/').replace(/^\.\//, ''); + if (!normalized) return undefined; + if (normalized.startsWith('.workgraph/runs/')) return 'run'; + if (normalized.startsWith('.workgraph/')) return undefined; + const directory = normalized.split('/')[0]; + if (directory === 'threads') return 'thread'; + if (directory === 'conversations') return 'conversation'; + if (directory === 'plan-steps') return 'plan-step'; + return undefined; +} + +function normalizeStringSet(values: Iterable | undefined): ReadonlySet | undefined { + if (!values) return undefined; + const set = new Set(); + for (const raw of values) { + const value = String(raw).trim().toLowerCase(); + if (!value) continue; + set.add(value); + } + return set.size > 0 ? set : undefined; +} + +function normalizeThreadPathSet(values: Iterable | undefined): ReadonlySet | undefined { + if (!values) return undefined; + const set = new Set(); + for (const raw of values) { + const normalized = normalizeThreadPath(raw); + if (!normalized) continue; + set.add(normalized); + } + return set.size > 0 ? set : undefined; +} + +function normalizeThreadPath(rawPath: string): string | undefined { + const raw = String(rawPath).trim(); + if (!raw) return undefined; + const decoded = safeDecodeURIComponent(raw); + const trimmed = decoded.replace(/\\/g, '/').replace(/^\.\//, ''); + if (!trimmed) return undefined; + const withDirectory = trimmed.startsWith('threads/') + ? trimmed + : `threads/${trimmed}`; + return withDirectory.endsWith('.md') + ? withDirectory + : `${withDirectory}.md`; +} + +function safeDecodeURIComponent(value: string): string { + try { + return decodeURIComponent(value); + } catch { + return value; + } +} + +function readNonEmptyString(value: unknown): string | undefined { + if (typeof value !== 'string') return undefined; + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : undefined; +} diff --git a/packages/control-api/src/server.test.ts b/packages/control-api/src/server.test.ts index 130e729..8a896a6 100644 --- a/packages/control-api/src/server.test.ts +++ b/packages/control-api/src/server.test.ts @@ -4,6 +4,7 @@ import os from 'node:os'; import path from 'node:path'; import { agent as agentModule, + ledger as ledgerModule, store as storeModule, thread as threadModule, workspace as workspaceModule, @@ -11,6 +12,7 @@ import { import { startWorkgraphServer } from './server.js'; const agent = agentModule; +const ledger = ledgerModule; const store = storeModule; const thread = threadModule; const workspace = workspaceModule; @@ -25,6 +27,137 @@ afterEach(() => { fs.rmSync(workspacePath, { recursive: true, force: true }); }); +interface SseEnvelope { + id: string; + type: string; + path: string; + actor: string; + fields: Record; + ts: string; +} + +interface ParsedSseEvent { + id: string; + event: string; + data: SseEnvelope; +} + +interface SseReader { + nextEvent: (timeoutMs?: number) => Promise; + close: () => Promise; +} + +async function openSseStream(url: string, init?: RequestInit): Promise { + const response = await fetch(url, init); + expect(response.status).toBe(200); + expect(response.body).toBeDefined(); + return createSseReader(response.body!); +} + +function createSseReader(stream: ReadableStream): SseReader { + const reader = stream.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + let ended = false; + + const nextEvent = async (timeoutMs: number = 4_000): Promise => { + const deadline = Date.now() + timeoutMs; + while (true) { + const parsed = tryParseFromBuffer(); + if (parsed) return parsed; + if (ended) { + throw new Error('SSE stream ended before next event.'); + } + const remainingMs = deadline - Date.now(); + if (remainingMs <= 0) { + throw new Error('Timed out waiting for SSE event.'); + } + const chunk = await withTimeout(reader.read(), remainingMs, 'Timed out waiting for SSE chunk.'); + if (chunk.done) { + ended = true; + buffer += decoder.decode(); + continue; + } + buffer += decoder.decode(chunk.value, { stream: true }); + } + }; + + const close = async () => { + ended = true; + try { + await reader.cancel(); + } catch { + // no-op + } + }; + + const tryParseFromBuffer = (): ParsedSseEvent | null => { + while (true) { + const boundaryIndex = buffer.indexOf('\n\n'); + if (boundaryIndex < 0) return null; + const block = buffer.slice(0, boundaryIndex); + buffer = buffer.slice(boundaryIndex + 2); + const parsed = parseSseBlock(block); + if (parsed) return parsed; + } + }; + + return { + nextEvent, + close, + }; +} + +function parseSseBlock(block: string): ParsedSseEvent | null { + const lines = block.split('\n').map((line) => line.replace(/\r$/, '')); + let id = ''; + let event = ''; + const dataLines: string[] = []; + for (const line of lines) { + if (!line || line.startsWith(':')) continue; + const separator = line.indexOf(':'); + if (separator < 0) continue; + const key = line.slice(0, separator).trim(); + const value = line.slice(separator + 1).trimStart(); + if (key === 'id') { + id = value; + continue; + } + if (key === 'event') { + event = value; + continue; + } + if (key === 'data') { + dataLines.push(value); + } + } + if (dataLines.length === 0) return null; + const data = JSON.parse(dataLines.join('\n')) as SseEnvelope; + return { + id: id || data.id, + event: event || data.type, + data, + }; +} + +async function withTimeout(promise: Promise, timeoutMs: number, message: string): Promise { + return await new Promise((resolve, reject) => { + const timer = setTimeout(() => { + reject(new Error(message)); + }, timeoutMs); + promise.then( + (value) => { + clearTimeout(timer); + resolve(value); + }, + (error) => { + clearTimeout(timer); + reject(error); + }, + ); + }); +} + describe('workgraph server REST API', () => { it('serves /health endpoint', async () => { const handle = await startWorkgraphServer({ @@ -345,6 +478,151 @@ describe('workgraph server REST API', () => { } }); + it('replays missed thread events from Last-Event-ID with stable ordering and ids', async () => { + const handle = await startWorkgraphServer({ + workspacePath, + host: '127.0.0.1', + port: 0, + }); + const streams: SseReader[] = []; + try { + const createdThread = thread.createThread(workspacePath, 'SSE replay', 'Replay goal', 'seed'); + const streamUrl = `${handle.baseUrl}/api/events` + + `?thread=${encodeURIComponent(createdThread.path)}` + + '&event=thread.created&event=thread.claimed&event=thread.done'; + + const initialStream = await openSseStream(streamUrl); + streams.push(initialStream); + const createdEvent = await initialStream.nextEvent(); + expect(createdEvent.event).toBe('thread.created'); + expect(createdEvent.data.id).toBe(createdEvent.id); + expect(Object.keys(createdEvent.data)).toEqual(['id', 'type', 'path', 'actor', 'fields', 'ts']); + await initialStream.close(); + + thread.claim(workspacePath, createdThread.path, 'worker-a'); + thread.done( + workspacePath, + createdThread.path, + 'worker-a', + 'Completed in SSE replay test https://github.com/Versatly/workgraph/pull/1', + ); + + const replayStream = await openSseStream(streamUrl, { + headers: { + 'last-event-id': createdEvent.id, + }, + }); + streams.push(replayStream); + const firstMissed = await replayStream.nextEvent(); + const secondMissed = await replayStream.nextEvent(); + expect(firstMissed.event).toBe('thread.claimed'); + expect(secondMissed.event).toBe('thread.done'); + expect(firstMissed.id).not.toBe(secondMissed.id); + await replayStream.close(); + + const deterministicReplay = await openSseStream(streamUrl, { + headers: { + 'last-event-id': createdEvent.id, + }, + }); + streams.push(deterministicReplay); + const replayAgainFirst = await deterministicReplay.nextEvent(); + const replayAgainSecond = await deterministicReplay.nextEvent(); + expect([replayAgainFirst.id, replayAgainSecond.id]).toEqual([firstMissed.id, secondMissed.id]); + expect([replayAgainFirst.event, replayAgainSecond.event]).toEqual(['thread.claimed', 'thread.done']); + await deterministicReplay.close(); + } finally { + for (const stream of streams) { + await stream.close(); + } + await handle.close(); + } + }); + + it('supports primitive filters for conversation, plan-step, and run updates', async () => { + const handle = await startWorkgraphServer({ + workspacePath, + host: '127.0.0.1', + port: 0, + }); + const streams: SseReader[] = []; + try { + ledger.append(workspacePath, 'seed', 'update', 'conversations/sse.md', 'conversation', { + status: 'active', + }); + ledger.append(workspacePath, 'seed', 'update', 'plan-steps/sse.md', 'plan-step', { + status: 'active', + }); + ledger.append(workspacePath, 'seed', 'update', '.workgraph/runs/run_sse', 'run', { + status: 'running', + }); + + const conversationStream = await openSseStream( + `${handle.baseUrl}/api/events?primitive=conversation&event=conversation.updated`, + ); + streams.push(conversationStream); + const conversationEvent = await conversationStream.nextEvent(); + expect(conversationEvent.event).toBe('conversation.updated'); + expect(conversationEvent.data.path).toBe('conversations/sse.md'); + await conversationStream.close(); + + const stepStream = await openSseStream( + `${handle.baseUrl}/api/events?primitive=plan-step&event=plan-step.updated`, + ); + streams.push(stepStream); + const stepEvent = await stepStream.nextEvent(); + expect(stepEvent.event).toBe('plan-step.updated'); + expect(stepEvent.data.path).toBe('plan-steps/sse.md'); + await stepStream.close(); + + const runStream = await openSseStream( + `${handle.baseUrl}/api/events?primitive=run&event=run.updated`, + ); + streams.push(runStream); + const runEvent = await runStream.nextEvent(); + expect(runEvent.event).toBe('run.updated'); + expect(runEvent.data.path).toBe('.workgraph/runs/run_sse'); + await runStream.close(); + } finally { + for (const stream of streams) { + await stream.close(); + } + await handle.close(); + } + }); + + it('sends keepalive heartbeat comments for idle SSE streams', async () => { + const handle = await startWorkgraphServer({ + workspacePath, + host: '127.0.0.1', + port: 0, + sseKeepaliveMs: 100, + }); + try { + const response = await fetch(`${handle.baseUrl}/api/events?event=thread.done`); + expect(response.status).toBe(200); + expect(response.body).toBeDefined(); + const reader = response.body!.getReader(); + const decoder = new TextDecoder(); + let output = ''; + const deadline = Date.now() + 1_500; + while (Date.now() < deadline && !output.includes(':keepalive')) { + const remaining = deadline - Date.now(); + const chunk = await withTimeout( + reader.read(), + remaining, + 'Timed out waiting for SSE keepalive comment.', + ); + if (chunk.done) break; + output += decoder.decode(chunk.value, { stream: true }); + } + expect(output.includes(':keepalive')).toBe(true); + await reader.cancel(); + } finally { + await handle.close(); + } + }); + it('enforces strict credential identity for mutating REST endpoints', async () => { const init = workspace.initWorkspace(workspacePath, { createReadme: false, createBases: false }); const registration = agent.registerAgent(workspacePath, 'api-admin', { diff --git a/packages/control-api/src/server.ts b/packages/control-api/src/server.ts index 35f6d4b..dd7b263 100644 --- a/packages/control-api/src/server.ts +++ b/packages/control-api/src/server.ts @@ -19,6 +19,8 @@ import { buildTimelineLens, } from './server-lenses.js'; import { + createDashboardEventFilter, + type DashboardEvent, listDashboardEventsSince, subscribeToDashboardEvents, toSsePayload, @@ -45,7 +47,8 @@ const DEFAULT_LEDGER_LIMIT = 20; const DEFAULT_THREADS_LIMIT = 100; const MAX_LEDGER_LIMIT = 500; const MAX_THREADS_LIMIT = 1_000; -const SSE_KEEPALIVE_MS = 15_000; +const DEFAULT_SSE_KEEPALIVE_MS = 15_000; +const SSE_RETRY_MS = 3_000; type LogLevel = 'info' | 'warn' | 'error'; @@ -56,6 +59,7 @@ export interface WorkgraphServerOptions { bearerToken?: string; defaultActor?: string; endpointPath?: string; + sseKeepaliveMs?: number; } type PrimitiveInstance = any; @@ -113,6 +117,7 @@ export async function startWorkgraphServer(options: WorkgraphServerOptions): Pro const port = normalizePort(options.port, DEFAULT_PORT); const endpointPath = readNonEmptyString(options.endpointPath) ?? DEFAULT_ENDPOINT_PATH; const defaultActor = readNonEmptyString(options.defaultActor) ?? 'anonymous'; + const sseKeepaliveMs = normalizeSseKeepaliveMs(options.sseKeepaliveMs); const workspaceInitialized = ensureWorkspaceInitialized(workspacePath); const unsubscribeWebhookDispatch = subscribeToDashboardEvents(workspacePath, (event) => { @@ -133,7 +138,7 @@ export async function startWorkgraphServer(options: WorkgraphServerOptions): Pro app.use('/api', (req: any, _res: any, next: () => void) => { auth.runWithAuthContext(buildRequestAuthContext(req), () => next()); }); - registerRestRoutes(app, workspacePath, defaultActor); + registerRestRoutes(app, workspacePath, defaultActor, sseKeepaliveMs); }, }); } catch (error) { @@ -227,13 +232,27 @@ export function loadServerOptionsFromEnv(env: NodeJS.ProcessEnv): WorkgraphServe bearerToken: readNonEmptyString(env.WORKGRAPH_BEARER_TOKEN), defaultActor: readNonEmptyString(env.WORKGRAPH_ACTOR) ?? 'anonymous', endpointPath: DEFAULT_ENDPOINT_PATH, + sseKeepaliveMs: parseOptionalPositiveInt(env.WORKGRAPH_SSE_KEEPALIVE_MS, { + max: 60_000, + }), }; } -function registerRestRoutes(app: any, workspacePath: string, defaultActor: string): void { +function registerRestRoutes( + app: any, + workspacePath: string, + defaultActor: string, + sseKeepaliveMs: number, +): void { app.get('/api/events', (req: any, res: any) => { try { - const lastEventId = readNonEmptyString(req.headers?.['last-event-id']); + const lastEventId = readNonEmptyString(req.headers?.['last-event-id']) + ?? readNonEmptyString(req.query?.lastEventId); + const filter = createDashboardEventFilter({ + eventTypes: readCsvQueryValues(req.query, ['event', 'events']), + primitiveTypes: readCsvQueryValues(req.query, ['primitive', 'primitiveType']), + threads: readCsvQueryValues(req.query, ['thread']), + }); res.setHeader('Content-Type', 'text/event-stream; charset=utf-8'); res.setHeader('Cache-Control', 'no-cache, no-transform'); res.setHeader('Connection', 'keep-alive'); @@ -242,31 +261,68 @@ function registerRestRoutes(app: any, workspacePath: string, defaultActor: strin res.flushHeaders(); } if (!safeStreamWrite(res, ':connected\n\n')) return; + if (!safeStreamWrite(res, `retry: ${SSE_RETRY_MS}\n\n`)) return; - const replay = listDashboardEventsSince(workspacePath, lastEventId); - for (const event of replay) { - if (!safeStreamWrite(res, toSsePayload(event))) { + let cleaned = false; + let streamReady = false; + let unsubscribe = () => {}; + let keepAlive: NodeJS.Timeout | undefined; + const queuedLiveEvents: DashboardEvent[] = []; + let dedupeDuringBootstrap = true; + const bootstrapDeliveredIds = new Set(); + + const cleanup = () => { + if (cleaned) return; + cleaned = true; + if (keepAlive) { + clearInterval(keepAlive); + } + unsubscribe(); + }; + + const emitEvent = (event: DashboardEvent): boolean => { + if (dedupeDuringBootstrap) { + if (bootstrapDeliveredIds.has(event.id)) return true; + bootstrapDeliveredIds.add(event.id); + } + if (safeStreamWrite(res, toSsePayload(event))) { + return true; + } + cleanup(); + return false; + }; + + unsubscribe = subscribeToDashboardEvents(workspacePath, (event) => { + if (!streamReady) { + queuedLiveEvents.push(event); return; } + emitEvent(event); + }, filter); + + const replay = listDashboardEventsSince(workspacePath, lastEventId, filter); + for (const event of replay) { + if (!emitEvent(event)) return; } - const unsubscribe = subscribeToDashboardEvents(workspacePath, (event) => { - safeStreamWrite(res, toSsePayload(event)); - }); - const keepAlive = setInterval(() => { - safeStreamWrite(res, ':keepalive\n\n'); - }, SSE_KEEPALIVE_MS); + while (queuedLiveEvents.length > 0) { + const event = queuedLiveEvents.shift(); + if (!event) break; + if (!emitEvent(event)) return; + } + streamReady = true; + dedupeDuringBootstrap = false; + bootstrapDeliveredIds.clear(); + + keepAlive = setInterval(() => { + if (!safeStreamWrite(res, `:keepalive ${Date.now()}\n\n`)) { + cleanup(); + } + }, sseKeepaliveMs); if (typeof keepAlive.unref === 'function') { keepAlive.unref(); } - let cleaned = false; - const cleanup = () => { - if (cleaned) return; - cleaned = true; - clearInterval(keepAlive); - unsubscribe(); - }; req.on('close', cleanup); req.on('aborted', cleanup); res.on('close', cleanup); @@ -717,6 +773,14 @@ function normalizePort(value: number | undefined, fallback: number): number { return Math.trunc(value); } +function normalizeSseKeepaliveMs(value: number | undefined): number { + if (value === undefined) return DEFAULT_SSE_KEEPALIVE_MS; + if (!Number.isFinite(value) || value < 100 || value > 60_000) { + throw new Error(`Invalid sse keepalive "${String(value)}". Expected 100..60000 ms.`); + } + return Math.trunc(value); +} + function parseStringList(value: unknown): string[] | undefined { if (value === undefined || value === null) return undefined; if (Array.isArray(value)) { @@ -732,6 +796,24 @@ function parseStringList(value: unknown): string[] | undefined { return fromString.length > 0 ? fromString : undefined; } +function readCsvQueryValues( + query: Record | undefined, + keys: string[], +): string[] | undefined { + if (!query) return undefined; + const values = new Set(); + for (const key of keys) { + const raw = query[key]; + if (raw === undefined || raw === null) continue; + const normalized = parseStringList(raw); + if (!normalized) continue; + for (const item of normalized) { + values.add(item); + } + } + return values.size > 0 ? [...values] : undefined; +} + function readNonEmptyString(value: unknown): string | undefined { const picked = readFirstValue(value); if (typeof picked !== 'string') return undefined;