diff --git a/docs/MCP_V2_COLLABORATION_PROTOCOL.md b/docs/MCP_V2_COLLABORATION_PROTOCOL.md new file mode 100644 index 0000000..2c3a75f --- /dev/null +++ b/docs/MCP_V2_COLLABORATION_PROTOCOL.md @@ -0,0 +1,153 @@ +# MCP v2 Collaboration Protocol (`wg_*` tools) + +WorkGraph MCP now exposes v2 collaboration tools that share a deterministic +machine-parseable response envelope: + +- `wg_post_message` +- `wg_ask` +- `wg_spawn_thread` +- `wg_heartbeat` + +## Deterministic envelope + +All v2 tools return `structuredContent` with one of these shapes: + +### Success envelope + +```json +{ + "ok": true, + "version": "2.0", + "tool": "wg_post_message", + "actor": "agent-name", + "data": { "...": "tool-specific payload" } +} +``` + +### Error envelope + +```json +{ + "ok": false, + "version": "2.0", + "tool": "wg_post_message", + "error": { + "code": "POLICY_DENIED", + "message": "Policy gate blocked MCP write.", + "retryable": false, + "details": {} + } +} +``` + +Error codes: + +- `BAD_INPUT` +- `NOT_FOUND` +- `POLICY_DENIED` +- `READ_ONLY` +- `IDEMPOTENCY_CONFLICT` +- `TIMEOUT` +- `INTERNAL_ERROR` + +## Tool contracts + +## `wg_post_message` + +Appends a structured conversation event tied to a thread. + +Input highlights: + +- `threadPath` (required) +- `body` (required) +- `messageType` (`message|note|decision|system|ask|reply`) +- `correlationId` / `replyToCorrelationId` +- `idempotencyKey` +- `evidence[]` (link/file metadata) +- `metadata` (object) + +Output highlights: + +- `operation` (`created|replayed`) +- `thread_path` +- `conversation_path` +- `event` (id, timestamps, correlation, evidence, metadata) +- `idempotency` (key + replay flag) + +## `wg_ask` + +Posts an ask event with correlation and optionally awaits/polls for reply. + +Input highlights: + +- `threadPath` (required) +- `question` (required) +- `correlationId` (optional; generated if omitted) +- `idempotencyKey` (optional) +- `awaitReply` + `timeoutMs` + `pollIntervalMs` + +Output highlights: + +- `operation` (`created|replayed`) +- `status` (`pending|answered`) +- `timed_out` +- `correlation_id` +- `ask` event + `reply` event (`null` if pending) + +## `wg_spawn_thread` + +Creates a child thread with inherited context from the parent. + +Input highlights: + +- `parentThreadPath`, `title`, `goal` (required) +- `priority`, `deps`, `tags`, `contextRefs`, `space` +- `idempotencyKey` + +Output highlights: + +- `operation` (`created|replayed`) +- `parent_thread_path` +- `thread` summary payload +- `idempotency` block + +## `wg_heartbeat` + +Writes agent presence heartbeat and thread claim heartbeat. + +Input highlights: + +- `actor` (optional; resolved from credential/default actor) +- `threadPath` (optional; all active/blocked owned threads if omitted) +- `threadLeaseMinutes` +- `status` (`online|busy|offline`) +- `currentWork` +- `capabilities` + +Output highlights: + +- `operation` (`updated`) +- `presence` summary +- `threads` heartbeat result (`touched` + `skipped`) + +## Idempotency semantics + +- `wg_post_message`: replay keyed by `idempotencyKey` in conversation events. +- `wg_ask`: replay keyed by `idempotencyKey`/`correlationId` for ask events while + still allowing fresh reply polling in subsequent calls. +- `wg_spawn_thread`: replay keyed by `idempotencyKey` on spawned child metadata. + +Reusing a key with different payload returns `IDEMPOTENCY_CONFLICT`. + +## SSE integration + +Collaboration writes emit SSE events through `/api/events` with explicit types: + +- `collaboration.message` +- `collaboration.ask` +- `collaboration.reply` +- `collaboration.heartbeat` + +Event IDs are now per-derived-event (`:`) so reconnect +replay with `Last-Event-ID` is safe even when multiple SSE events come from one +ledger append. diff --git a/packages/kernel/src/conversation.ts b/packages/kernel/src/conversation.ts index 6aef89a..6ae1936 100644 --- a/packages/kernel/src/conversation.ts +++ b/packages/kernel/src/conversation.ts @@ -2,6 +2,8 @@ * Conversation + plan-step coordination primitives. */ +import { createHash } from 'node:crypto'; +import * as ledger from './ledger.js'; import * as store from './store.js'; import type { ConversationStateSummary, @@ -14,14 +16,33 @@ import { PLAN_STEP_STATUS_TRANSITIONS, } from './types.js'; -export type ConversationEventKind = 'message' | 'note' | 'decision' | 'system'; +export type ConversationEventKind = 'message' | 'note' | 'decision' | 'system' | 'ask' | 'reply'; + +export type ConversationEvidenceKind = 'link' | 'file'; + +export interface ConversationEvidenceAttachment { + kind: ConversationEvidenceKind; + url?: string; + path?: string; + title?: string; + mime_type?: string; + size_bytes?: number; + sha256?: string; +} export interface ConversationEventRecord { + id?: string; ts: string; actor: string; kind: ConversationEventKind; + event_type?: string; message: string; thread_ref?: string; + correlation_id?: string; + reply_to?: string; + idempotency_key?: string; + evidence?: ConversationEvidenceAttachment[]; + metadata?: Record; } export interface CreateConversationOptions { @@ -33,7 +54,14 @@ export interface CreateConversationOptions { export interface AppendConversationMessageOptions { kind?: ConversationEventKind; + eventType?: string; + eventId?: string; threadRef?: string; + correlationId?: string; + replyTo?: string; + idempotencyKey?: string; + evidence?: ConversationEvidenceAttachment[]; + metadata?: Record; } export interface CreatePlanStepOptions { @@ -134,6 +162,11 @@ export function getConversation(workspacePath: string, conversationRef: string): }; } +export function listConversationEvents(workspacePath: string, conversationRef: string): ConversationEventRecord[] { + const conversation = readConversationOrThrow(workspacePath, conversationRef); + return coerceEventRecords(conversation.fields.events); +} + export function appendConversationMessage( workspacePath: string, conversationRef: string, @@ -146,12 +179,34 @@ export function appendConversationMessage( if (!trimmedMessage) { throw new Error('Conversation message text cannot be empty.'); } + const kind = normalizeConversationEventKind(options.kind) ?? 'message'; + const eventType = asOptionalString(options.eventType) ?? kind; + const correlationId = asOptionalString(options.correlationId); + const replyTo = asOptionalString(options.replyTo); + const idempotencyKey = asOptionalString(options.idempotencyKey); + const metadata = coerceMetadataRecord(options.metadata); + const evidence = coerceEvidenceAttachments(options.evidence); const event: ConversationEventRecord = { + id: asOptionalString(options.eventId) ?? mintConversationEventId({ + conversationPath: conversation.path, + actor, + kind, + message: trimmedMessage, + eventType, + correlationId, + idempotencyKey, + }), ts: new Date().toISOString(), actor, - kind: options.kind ?? 'message', + kind, + event_type: eventType, message: trimmedMessage, ...(options.threadRef ? { thread_ref: normalizeThreadRef(options.threadRef) } : {}), + ...(correlationId ? { correlation_id: correlationId } : {}), + ...(replyTo ? { reply_to: replyTo } : {}), + ...(idempotencyKey ? { idempotency_key: idempotencyKey } : {}), + ...(evidence.length > 0 ? { evidence } : {}), + ...(metadata ? { metadata } : {}), }; if (event.thread_ref) { assertThreadExists(workspacePath, event.thread_ref); @@ -175,6 +230,22 @@ export function appendConversationMessage( }), actor, ); + ledger.append(workspacePath, actor, 'update', updated.path, 'conversation', { + conversation_event: { + id: event.id, + ts: event.ts, + actor: event.actor, + kind: event.kind, + event_type: event.event_type, + message: event.message, + thread_ref: event.thread_ref, + correlation_id: event.correlation_id, + reply_to: event.reply_to, + idempotency_key: event.idempotency_key, + evidence: event.evidence, + metadata: event.metadata, + }, + }); const synced = syncConversationState(workspacePath, updated.path, actor); return { conversation: synced, @@ -679,6 +750,22 @@ function normalizePlanStepStatus(value: unknown): PlanStepStatus { return 'open'; } +function normalizeConversationEventKind(value: unknown): ConversationEventKind | undefined { + const normalized = String(value ?? '').trim().toLowerCase(); + if (!normalized) return undefined; + if ( + normalized === 'message' || + normalized === 'note' || + normalized === 'decision' || + normalized === 'system' || + normalized === 'ask' || + normalized === 'reply' + ) { + return normalized; + } + return undefined; +} + function comparePlanSteps(left: PrimitiveInstance, right: PrimitiveInstance): number { const byOrder = toFiniteNumber(left.fields.order, Number.MAX_SAFE_INTEGER) - toFiniteNumber(right.fields.order, Number.MAX_SAFE_INTEGER); if (byOrder !== 0) return byOrder; @@ -717,23 +804,92 @@ function coerceEventRecords(value: unknown): ConversationEventRecord[] { for (const item of value) { if (!item || typeof item !== 'object') continue; const record = item as Record; + const id = asOptionalString(record.id); const ts = asOptionalString(record.ts); const actor = asOptionalString(record.actor); - const kind = asOptionalString(record.kind) as ConversationEventKind | undefined; + const kind = normalizeConversationEventKind(record.kind); + const eventType = asOptionalString(record.event_type); const message = asOptionalString(record.message); if (!ts || !actor || !message) continue; - if (kind !== 'message' && kind !== 'note' && kind !== 'decision' && kind !== 'system') continue; + if (!kind) continue; + const metadata = coerceMetadataRecord(record.metadata); + const evidence = coerceEvidenceAttachments(record.evidence); records.push({ + ...(id ? { id } : {}), ts, actor, kind, + ...(eventType ? { event_type: eventType } : {}), message, ...(asOptionalString(record.thread_ref) ? { thread_ref: normalizeThreadRef(record.thread_ref) } : {}), + ...(asOptionalString(record.correlation_id) ? { correlation_id: asOptionalString(record.correlation_id) } : {}), + ...(asOptionalString(record.reply_to) ? { reply_to: asOptionalString(record.reply_to) } : {}), + ...(asOptionalString(record.idempotency_key) ? { idempotency_key: asOptionalString(record.idempotency_key) } : {}), + ...(evidence.length > 0 ? { evidence } : {}), + ...(metadata ? { metadata } : {}), }); } return records; } +function coerceEvidenceAttachments(value: unknown): ConversationEvidenceAttachment[] { + if (!Array.isArray(value)) return []; + const output: ConversationEvidenceAttachment[] = []; + for (const item of value) { + if (!item || typeof item !== 'object' || Array.isArray(item)) continue; + const record = item as Record; + const kind = normalizeEvidenceKind(record.kind); + if (!kind) continue; + const url = asOptionalString(record.url); + const filePath = asOptionalString(record.path); + if (!url && !filePath) continue; + const sizeBytes = toFiniteNumber(record.size_bytes, -1); + output.push({ + kind, + ...(url ? { url } : {}), + ...(filePath ? { path: filePath } : {}), + ...(asOptionalString(record.title) ? { title: asOptionalString(record.title) } : {}), + ...(asOptionalString(record.mime_type) ? { mime_type: asOptionalString(record.mime_type) } : {}), + ...(sizeBytes >= 0 ? { size_bytes: Math.round(sizeBytes) } : {}), + ...(asOptionalString(record.sha256) ? { sha256: asOptionalString(record.sha256) } : {}), + }); + } + return output; +} + +function normalizeEvidenceKind(value: unknown): ConversationEvidenceKind | undefined { + const normalized = String(value ?? '').trim().toLowerCase(); + if (normalized === 'link' || normalized === 'file') return normalized; + return undefined; +} + +function coerceMetadataRecord(value: unknown): Record | undefined { + if (!value || typeof value !== 'object' || Array.isArray(value)) return undefined; + const normalized = Object.fromEntries( + Object.entries(value) + .filter(([key]) => key.trim().length > 0) + .map(([key, item]) => [key, normalizeMetadataValue(item)]), + ); + return Object.keys(normalized).length > 0 ? normalized : undefined; +} + +function normalizeMetadataValue(value: unknown): unknown { + if (value === null) return null; + if (typeof value === 'string' || typeof value === 'number' || typeof value === 'boolean') { + return value; + } + if (Array.isArray(value)) { + return value.map((item) => normalizeMetadataValue(item)); + } + if (typeof value === 'object') { + const record = value as Record; + return Object.fromEntries( + Object.entries(record).map(([key, item]) => [key, normalizeMetadataValue(item)]), + ); + } + return String(value); +} + function uniqueRefs(values: string[]): string[] { const deduped = new Set(); for (const value of values) { @@ -810,13 +966,42 @@ function renderConversationBody(input: { lines.push('- none'); } else { for (const event of recentEvents) { - lines.push(`- ${event.ts} [${event.kind}] ${event.actor}${event.thread_ref ? ` thread=${event.thread_ref}` : ''}: ${event.message}`); + const marker = event.event_type ?? event.kind; + const details = [ + event.thread_ref ? `thread=${event.thread_ref}` : '', + event.correlation_id ? `correlation=${event.correlation_id}` : '', + event.reply_to ? `reply_to=${event.reply_to}` : '', + event.evidence && event.evidence.length > 0 ? `evidence=${event.evidence.length}` : '', + ].filter((entry) => entry.length > 0); + lines.push(`- ${event.ts} [${marker}] ${event.actor}${details.length > 0 ? ` (${details.join(', ')})` : ''}: ${event.message}`); } } lines.push(''); return lines.join('\n'); } +function mintConversationEventId(input: { + conversationPath: string; + actor: string; + kind: ConversationEventKind; + eventType: string; + message: string; + correlationId?: string; + idempotencyKey?: string; +}): string { + const seed = stableStringify({ + conversation: input.conversationPath, + actor: input.actor, + kind: input.kind, + eventType: input.eventType, + message: input.message, + correlationId: input.correlationId, + idempotencyKey: input.idempotencyKey, + now: new Date().toISOString(), + }); + return `evt_${createHash('sha1').update(seed).digest('hex').slice(0, 16)}`; +} + function renderPlanStepBody(input: { title: string; status: PlanStepStatus; diff --git a/packages/kernel/src/mcp-events.test.ts b/packages/kernel/src/mcp-events.test.ts index fa15584..5bfc3b1 100644 --- a/packages/kernel/src/mcp-events.test.ts +++ b/packages/kernel/src/mcp-events.test.ts @@ -87,6 +87,52 @@ describe('mcp-events core module', () => { ]); }); + it('maps collaboration ask/reply and heartbeat events', () => { + const ask = mapLedgerEntryToSseEvents(entry({ + op: 'update', + target: 'conversations/coordination.md', + type: 'conversation', + data: { + conversation_event: { + event_type: 'ask', + correlation_id: 'corr-1', + message: 'Need update', + }, + }, + })); + expect(ask.some((event) => event.type === 'collaboration.ask')).toBe(true); + expect(ask.some((event) => event.type === 'primitive.updated')).toBe(true); + + const reply = mapLedgerEntryToSseEvents(entry({ + op: 'update', + target: 'conversations/coordination.md', + type: 'conversation', + data: { + conversation_event: { + event_type: 'reply', + reply_to: 'corr-1', + message: 'Shipped', + }, + }, + })); + expect(reply.some((event) => event.type === 'collaboration.reply')).toBe(true); + expect(reply.some((event) => event.type === 'primitive.updated')).toBe(true); + + const threadHeartbeat = mapLedgerEntryToSseEvents(entry({ + op: 'heartbeat', + target: 'threads/coordination.md', + type: 'thread', + data: { + ttl_minutes: 15, + }, + })); + expect(threadHeartbeat).toEqual([ + expect.objectContaining({ + type: 'collaboration.heartbeat', + }), + ]); + }); + it('suppresses primitive CRUD events for ledger internals and untyped entries', () => { const ledgerInternal = mapLedgerEntryToSseEvents(entry({ op: 'create', diff --git a/packages/kernel/src/mcp-events.ts b/packages/kernel/src/mcp-events.ts index dada26a..2af7676 100644 --- a/packages/kernel/src/mcp-events.ts +++ b/packages/kernel/src/mcp-events.ts @@ -17,7 +17,17 @@ export interface WorkgraphEventStreamOptions { } export interface WorkgraphSseEvent { - type: 'primitive.created' | 'primitive.updated' | 'primitive.deleted' | 'thread.claimed' | 'thread.completed' | 'trigger.fired'; + type: + | 'primitive.created' + | 'primitive.updated' + | 'primitive.deleted' + | 'thread.claimed' + | 'thread.completed' + | 'trigger.fired' + | 'collaboration.message' + | 'collaboration.ask' + | 'collaboration.reply' + | 'collaboration.heartbeat'; primitive: string; timestamp: string; actor: string; @@ -189,6 +199,28 @@ export function mapLedgerEntryToSseEvents(entry: LedgerEntry): WorkgraphSseEvent events.push({ ...base, type: 'trigger.fired' }); } + if (entry.type === 'conversation' && entry.op === 'update') { + const conversationEvent = toRecord(entry.data?.conversation_event); + if (conversationEvent) { + const eventKind = String(conversationEvent.event_type ?? conversationEvent.kind ?? 'message').trim().toLowerCase(); + events.push({ + ...base, + type: eventKind === 'ask' + ? 'collaboration.ask' + : eventKind === 'reply' + ? 'collaboration.reply' + : 'collaboration.message', + }); + } + } + + if ( + (entry.type === 'thread' && entry.op === 'heartbeat') || + (entry.type === 'presence' && entry.op === 'update' && isPresenceHeartbeat(entry)) + ) { + events.push({ ...base, type: 'collaboration.heartbeat' }); + } + return events; } @@ -205,6 +237,17 @@ function primitiveSlugFromTarget(target: string): string { return basename.endsWith('.md') ? basename.slice(0, -3) : basename; } +function isPresenceHeartbeat(entry: LedgerEntry): boolean { + const changed = entry.data?.changed; + if (!Array.isArray(changed)) return false; + return changed.some((field) => String(field).trim() === 'last_seen'); +} + +function toRecord(value: unknown): Record | null { + if (!value || typeof value !== 'object' || Array.isArray(value)) return null; + return value as Record; +} + function normalizeSsePath(pathValue: string | undefined): string { if (!pathValue) return DEFAULT_SSE_PATH; const trimmed = pathValue.trim(); diff --git a/packages/mcp-server/src/mcp-server.test.ts b/packages/mcp-server/src/mcp-server.test.ts index 2339923..064c71c 100644 --- a/packages/mcp-server/src/mcp-server.test.ts +++ b/packages/mcp-server/src/mcp-server.test.ts @@ -206,6 +206,10 @@ describe('workgraph mcp server', () => { 'workgraph_dispatch_stop', 'workgraph_trigger_engine_cycle', 'workgraph_autonomy_run', + 'wg_post_message', + 'wg_ask', + 'wg_spawn_thread', + 'wg_heartbeat', ]; for (const name of expectedTools) { expect(toolNames.has(name)).toBe(true); @@ -218,6 +222,237 @@ describe('workgraph mcp server', () => { await server.close(); } }); + + it('supports deterministic v2 collaboration tools with schema/auth/idempotency handling', async () => { + const parent = thread.createThread( + workspacePath, + 'Parent coordination', + 'Coordinate collaboration flow', + 'seed-agent', + ); + const server = createWorkgraphMcpServer({ + workspacePath, + defaultActor: 'agent-v2', + }); + const client = new Client({ + name: 'workgraph-mcp-test-client-v2', + version: '1.0.0', + }); + const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair(); + + await Promise.all([ + server.connect(serverTransport), + client.connect(clientTransport), + ]); + + try { + let schemaRejected = false; + try { + const schemaResult = await client.callTool({ + name: 'wg_post_message', + arguments: { + threadPath: parent.path, + body: 'should fail schema', + messageType: 'invalid-message-type', + }, + }); + schemaRejected = isToolError(schemaResult); + } catch { + schemaRejected = true; + } + expect(schemaRejected).toBe(true); + + const denied = await client.callTool({ + name: 'wg_post_message', + arguments: { + threadPath: parent.path, + body: 'Auth denied message', + idempotencyKey: 'post-denied-key', + }, + }); + expect(isToolError(denied)).toBe(true); + const deniedPayload = getStructured<{ ok: boolean; error: { code: string } }>(denied); + expect(deniedPayload.ok).toBe(false); + expect(deniedPayload.error.code).toBe('POLICY_DENIED'); + + policy.upsertParty(workspacePath, 'agent-v2', { + roles: ['operator'], + capabilities: ['mcp:write', 'thread:update', 'thread:create', 'agent:heartbeat'], + }); + + const posted = await client.callTool({ + name: 'wg_post_message', + arguments: { + threadPath: parent.path, + body: 'Coordination message', + messageType: 'message', + idempotencyKey: 'post-idem-key', + evidence: [ + { + kind: 'link', + url: 'https://github.com/versatly/workgraph/pull/999', + title: 'PR evidence', + }, + ], + metadata: { + source: 'test', + attempt: 1, + }, + }, + }); + expect(isToolError(posted)).toBe(false); + const postedPayload = getStructured<{ + ok: boolean; + data: { operation: string; event: { id: string } }; + }>(posted); + expect(postedPayload.ok).toBe(true); + expect(postedPayload.data.operation).toBe('created'); + + const postReplay = await client.callTool({ + name: 'wg_post_message', + arguments: { + threadPath: parent.path, + body: 'Coordination message', + messageType: 'message', + idempotencyKey: 'post-idem-key', + evidence: [ + { + kind: 'link', + url: 'https://github.com/versatly/workgraph/pull/999', + title: 'PR evidence', + }, + ], + metadata: { + source: 'test', + attempt: 1, + }, + }, + }); + expect(isToolError(postReplay)).toBe(false); + const replayPayload = getStructured<{ + data: { operation: string; event: { id: string } }; + }>(postReplay); + expect(replayPayload.data.operation).toBe('replayed'); + expect(replayPayload.data.event.id).toBe(postedPayload.data.event.id); + + const postConflict = await client.callTool({ + name: 'wg_post_message', + arguments: { + threadPath: parent.path, + body: 'Changed body should conflict', + messageType: 'message', + idempotencyKey: 'post-idem-key', + }, + }); + expect(isToolError(postConflict)).toBe(true); + const conflictPayload = getStructured<{ ok: boolean; error: { code: string } }>(postConflict); + expect(conflictPayload.ok).toBe(false); + expect(conflictPayload.error.code).toBe('IDEMPOTENCY_CONFLICT'); + + const asked = await client.callTool({ + name: 'wg_ask', + arguments: { + threadPath: parent.path, + question: 'Can you provide a status update?', + idempotencyKey: 'ask-idem-key', + awaitReply: false, + }, + }); + expect(isToolError(asked)).toBe(false); + const askPayload = getStructured<{ + data: { + operation: string; + status: string; + correlation_id: string; + ask: { id: string }; + }; + }>(asked); + expect(askPayload.data.operation).toBe('created'); + expect(askPayload.data.status).toBe('pending'); + + const askReplay = await client.callTool({ + name: 'wg_ask', + arguments: { + threadPath: parent.path, + question: 'Can you provide a status update?', + idempotencyKey: 'ask-idem-key', + awaitReply: false, + }, + }); + expect(isToolError(askReplay)).toBe(false); + const askReplayPayload = getStructured<{ + data: { + operation: string; + correlation_id: string; + ask: { id: string }; + }; + }>(askReplay); + expect(askReplayPayload.data.operation).toBe('replayed'); + expect(askReplayPayload.data.correlation_id).toBe(askPayload.data.correlation_id); + expect(askReplayPayload.data.ask.id).toBe(askPayload.data.ask.id); + + const spawned = await client.callTool({ + name: 'wg_spawn_thread', + arguments: { + parentThreadPath: parent.path, + title: 'Child coordination task', + goal: 'Implement child flow', + idempotencyKey: 'spawn-idem-key', + tags: ['coordination'], + contextRefs: ['spaces/platform.md'], + }, + }); + expect(isToolError(spawned)).toBe(false); + const spawnedPayload = getStructured<{ + data: { operation: string; thread: { path: string } }; + }>(spawned); + expect(spawnedPayload.data.operation).toBe('created'); + + const spawnReplay = await client.callTool({ + name: 'wg_spawn_thread', + arguments: { + parentThreadPath: parent.path, + title: 'Child coordination task', + goal: 'Implement child flow', + idempotencyKey: 'spawn-idem-key', + tags: ['coordination'], + contextRefs: ['spaces/platform.md'], + }, + }); + expect(isToolError(spawnReplay)).toBe(false); + const spawnReplayPayload = getStructured<{ + data: { operation: string; thread: { path: string } }; + }>(spawnReplay); + expect(spawnReplayPayload.data.operation).toBe('replayed'); + expect(spawnReplayPayload.data.thread.path).toBe(spawnedPayload.data.thread.path); + + const heartbeatResult = await client.callTool({ + name: 'wg_heartbeat', + arguments: { + actor: 'agent-v2', + status: 'busy', + currentWork: parent.path, + threadPath: parent.path, + threadLeaseMinutes: 20, + }, + }); + expect(isToolError(heartbeatResult)).toBe(false); + const heartbeatPayload = getStructured<{ + data: { + operation: string; + presence: { status: string }; + threads: { touched: unknown[]; skipped: unknown[] }; + }; + }>(heartbeatResult); + expect(heartbeatPayload.data.operation).toBe('updated'); + expect(heartbeatPayload.data.presence.status).toBe('busy'); + expect(Array.isArray(heartbeatPayload.data.threads.touched)).toBe(true); + expect(Array.isArray(heartbeatPayload.data.threads.skipped)).toBe(true); + } finally { + await client.close(); + await server.close(); + } + }); }); function getStructured(result: unknown): T { @@ -236,67 +471,3 @@ function isToolError(result: unknown): boolean { if (!('isError' in result)) return false; return (result as { isError?: boolean }).isError === true; } - -async function readSseEvents( - reader: ReadableStreamDefaultReader, - minEvents: number, - timeoutMs: number, -): Promise> { - const decoder = new TextDecoder(); - let buffer = ''; - const events: Array<{ type: string; data: unknown }> = []; - const deadline = Date.now() + timeoutMs; - - while (events.length < minEvents && Date.now() < deadline) { - const remaining = deadline - Date.now(); - const nextChunk = await Promise.race([ - reader.read(), - new Promise((_, reject) => - setTimeout(() => reject(new Error('Timed out waiting for SSE events.')), remaining)), - ]); - if (nextChunk.done) break; - buffer += decoder.decode(nextChunk.value, { stream: true }); - - let separatorIndex = buffer.indexOf('\n\n'); - while (separatorIndex !== -1) { - const rawEvent = buffer.slice(0, separatorIndex); - buffer = buffer.slice(separatorIndex + 2); - const parsed = parseSseEvent(rawEvent); - if (parsed) events.push(parsed); - separatorIndex = buffer.indexOf('\n\n'); - } - } - - if (events.length < minEvents) { - throw new Error(`Expected at least ${minEvents} SSE events, received ${events.length}.`); - } - - return events; -} - -function parseSseEvent(rawEvent: string): { type: string; data: unknown } | null { - const lines = rawEvent - .split('\n') - .map((line) => line.trim()) - .filter((line) => line.length > 0 && !line.startsWith(':')); - if (lines.length === 0) return null; - - let type = 'message'; - let dataLine = ''; - for (const line of lines) { - if (line.startsWith('event:')) { - type = line.slice('event:'.length).trim(); - } else if (line.startsWith('data:')) { - dataLine += line.slice('data:'.length).trim(); - } - } - if (!dataLine) return null; - try { - return { - type, - data: JSON.parse(dataLine), - }; - } catch { - return null; - } -} diff --git a/packages/mcp-server/src/mcp-server.ts b/packages/mcp-server/src/mcp-server.ts index 1e4fad7..835c90a 100644 --- a/packages/mcp-server/src/mcp-server.ts +++ b/packages/mcp-server/src/mcp-server.ts @@ -1,5 +1,6 @@ import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; +import { registerCollaborationTools } from './mcp/tools/collaboration-tools.js'; import { registerResources } from './mcp/resources.js'; import { registerReadTools } from './mcp/tools/read-tools.js'; import { registerWriteTools } from './mcp/tools/write-tools.js'; @@ -24,6 +25,7 @@ export function createWorkgraphMcpServer(options: WorkgraphMcpServerOptions): Mc registerResources(server, options); registerReadTools(server, options); registerWriteTools(server, options); + registerCollaborationTools(server, options); return server; } diff --git a/packages/mcp-server/src/mcp/result.ts b/packages/mcp-server/src/mcp/result.ts index f5fd768..7f12390 100644 --- a/packages/mcp-server/src/mcp/result.ts +++ b/packages/mcp-server/src/mcp/result.ts @@ -25,6 +25,105 @@ export function errorResult(error: unknown) { }; } +export type CollaborationToolName = + | 'wg_post_message' + | 'wg_ask' + | 'wg_spawn_thread' + | 'wg_heartbeat'; + +export interface CollaborationToolError { + code: + | 'BAD_INPUT' + | 'NOT_FOUND' + | 'POLICY_DENIED' + | 'READ_ONLY' + | 'IDEMPOTENCY_CONFLICT' + | 'TIMEOUT' + | 'INTERNAL_ERROR'; + message: string; + retryable: boolean; + details?: Record; +} + +export interface CollaborationToolSuccessEnvelope { + ok: true; + version: '2.0'; + tool: CollaborationToolName; + actor: string; + data: TData; +} + +export interface CollaborationToolErrorEnvelope { + ok: false; + version: '2.0'; + tool: CollaborationToolName; + error: CollaborationToolError; +} + +export class McpToolError extends Error { + code: CollaborationToolError['code']; + retryable: boolean; + details?: Record; + + constructor( + code: CollaborationToolError['code'], + message: string, + options: { + retryable?: boolean; + details?: Record; + } = {}, + ) { + super(message); + this.name = 'McpToolError'; + this.code = code; + this.retryable = options.retryable ?? false; + this.details = options.details; + } +} + +export function collaborationOkResult( + tool: CollaborationToolName, + actor: string, + data: TData, +) { + const envelope: CollaborationToolSuccessEnvelope = { + ok: true, + version: '2.0', + tool, + actor, + data, + }; + return { + content: [ + { + type: 'text' as const, + text: toPrettyJson(envelope), + }, + ], + structuredContent: envelope as unknown as Record, + }; +} + +export function collaborationErrorResult(tool: CollaborationToolName, error: unknown) { + const normalized = toCollaborationToolError(error); + const envelope: CollaborationToolErrorEnvelope = { + ok: false, + version: '2.0', + tool, + error: normalized, + }; + return { + isError: true, + content: [ + { + type: 'text' as const, + text: `[${normalized.code}] ${normalized.message}`, + }, + ], + structuredContent: envelope as unknown as Record, + }; +} + export function toPrettyJson(value: unknown): string { return JSON.stringify(value, null, 2); } @@ -36,3 +135,43 @@ export function renderStatusSummary(snapshot: ReturnType { + const hasUrl = typeof value.url === 'string' && value.url.trim().length > 0; + const hasPath = typeof value.path === 'string' && value.path.trim().length > 0; + if (value.kind === 'link') return hasUrl; + if (value.kind === 'file') return hasPath; + return false; + }, + { + message: 'Evidence item must provide url for link kind or path for file kind.', + }, +); + +const metadataSchema = z + .record(z.string(), z.unknown()) + .describe('Arbitrary machine-readable metadata preserved with the event.'); + +export function registerCollaborationTools(server: McpServer, options: WorkgraphMcpServerOptions): void { + server.registerTool( + 'wg_post_message', + { + title: 'WorkGraph Post Message', + description: 'Append a structured collaboration message event to a thread conversation.', + inputSchema: { + threadPath: z.string().min(1).describe('Target thread path (threads/.md).'), + actor: z.string().optional().describe('Actor identity for write attribution.'), + conversationPath: z.string().optional().describe('Optional existing conversation path.'), + body: z.string().min(1).describe('Message body text to append.'), + messageType: z.enum(MESSAGE_TYPES).optional().describe('Conversation event type/kind.'), + correlationId: z.string().optional().describe('Correlation ID for ask/reply coordination.'), + replyToCorrelationId: z.string().optional().describe('Correlation ID this reply responds to.'), + idempotencyKey: z.string().optional().describe('Stable idempotency key for retry-safe writes.'), + evidence: z.array(evidenceAttachmentSchema).optional().describe('Optional evidence attachment descriptors.'), + metadata: metadataSchema.optional(), + }, + annotations: { + destructiveHint: true, + idempotentHint: false, + }, + }, + async (args) => { + try { + const actor = resolveActor(options.workspacePath, args.actor, options.defaultActor); + assertWriteAllowed(options, actor, ['thread:update', 'mcp:write'], { + action: 'mcp.collaboration.post-message', + target: normalizeThreadPath(args.threadPath), + }); + const threadPath = assertThreadExists(options.workspacePath, args.threadPath); + const conversationPath = resolveConversationPath(options.workspacePath, actor, threadPath, args.conversationPath); + const idempotencyKey = normalizeOptionalString(args.idempotencyKey); + const messageType = args.messageType ?? 'message'; + const correlationId = normalizeOptionalString(args.correlationId); + const replyToCorrelationId = normalizeOptionalString(args.replyToCorrelationId); + if (messageType === 'reply' && !replyToCorrelationId) { + throw new McpToolError('BAD_INPUT', 'Reply message requires replyToCorrelationId.'); + } + if (messageType === 'ask' && !correlationId) { + throw new McpToolError('BAD_INPUT', 'Ask message requires correlationId.'); + } + const existing = idempotencyKey + ? findEventByIdempotencyKey(options.workspacePath, conversationPath, idempotencyKey) + : null; + if (existing) { + assertPostReplayCompatible(existing, { + threadPath, + body: args.body, + messageType, + correlationId, + replyToCorrelationId, + evidence: args.evidence, + metadata: args.metadata, + }); + return collaborationOkResult('wg_post_message', actor, { + operation: 'replayed', + thread_path: threadPath, + conversation_path: conversationPath, + idempotency: { + key: idempotencyKey, + replayed: true, + }, + event: serializeEvent(existing), + }); + } + const eventId = mintEventId('msg', { + threadPath, + actor, + messageType, + correlationId, + idempotencyKey, + message: args.body, + }); + const appended = conversation.appendConversationMessage( + options.workspacePath, + conversationPath, + actor, + args.body, + { + kind: messageType, + eventType: messageType, + eventId, + threadRef: threadPath, + correlationId, + replyTo: replyToCorrelationId, + idempotencyKey, + evidence: args.evidence, + metadata: args.metadata, + }, + ); + const appendedEvent = findEventById(options.workspacePath, appended.conversation.path, eventId) + ?? lastConversationEvent(options.workspacePath, appended.conversation.path); + if (!appendedEvent) { + throw new McpToolError('INTERNAL_ERROR', 'Post-message completed without a persisted conversation event.'); + } + return collaborationOkResult('wg_post_message', actor, { + operation: 'created', + thread_path: threadPath, + conversation_path: appended.conversation.path, + idempotency: { + key: idempotencyKey, + replayed: false, + }, + event: serializeEvent(appendedEvent), + }); + } catch (error) { + return collaborationErrorResult('wg_post_message', error); + } + }, + ); + + server.registerTool( + 'wg_ask', + { + title: 'WorkGraph Ask', + description: 'Post a correlated question and optionally await/poll for a reply.', + inputSchema: { + threadPath: z.string().min(1).describe('Target thread path (threads/.md).'), + actor: z.string().optional().describe('Actor identity for write attribution.'), + conversationPath: z.string().optional().describe('Optional existing conversation path.'), + question: z.string().min(1).describe('Question text to post.'), + correlationId: z.string().optional().describe('Optional correlation ID (generated when omitted).'), + idempotencyKey: z.string().optional().describe('Stable idempotency key for retry-safe asks.'), + evidence: z.array(evidenceAttachmentSchema).optional().describe('Optional evidence attachments for ask context.'), + metadata: metadataSchema.optional(), + awaitReply: z.boolean().optional().describe('Whether the tool should wait for a matching reply event.'), + timeoutMs: z.number().int().min(0).max(120_000).optional().describe('Reply wait timeout when awaitReply=true.'), + pollIntervalMs: z.number().int().min(25).max(5_000).optional().describe('Polling interval used while awaiting reply.'), + }, + annotations: { + destructiveHint: true, + idempotentHint: false, + }, + }, + async (args) => { + try { + const actor = resolveActor(options.workspacePath, args.actor, options.defaultActor); + assertWriteAllowed(options, actor, ['thread:update', 'mcp:write'], { + action: 'mcp.collaboration.ask', + target: normalizeThreadPath(args.threadPath), + }); + const threadPath = assertThreadExists(options.workspacePath, args.threadPath); + const conversationPath = resolveConversationPath(options.workspacePath, actor, threadPath, args.conversationPath); + const idempotencyKey = normalizeOptionalString(args.idempotencyKey); + const initialCorrelation = normalizeOptionalString(args.correlationId) + ?? (idempotencyKey ? correlationIdFromIdempotency(idempotencyKey) : undefined) + ?? correlationIdFromQuestion(threadPath, args.question); + const postResult = ensureAskEvent(options.workspacePath, { + actor, + threadPath, + conversationPath, + question: args.question, + idempotencyKey, + correlationId: initialCorrelation, + evidence: args.evidence, + metadata: args.metadata, + }); + const correlationId = postResult.correlationId; + const shouldAwaitReply = args.awaitReply === true; + const timeoutMs = args.timeoutMs ?? 30_000; + const pollIntervalMs = args.pollIntervalMs ?? 250; + const startedAt = Date.now(); + let replyEvent = findLatestReplyEvent(options.workspacePath, conversationPath, correlationId); + while (!replyEvent && shouldAwaitReply && Date.now() - startedAt < timeoutMs) { + await sleep(pollIntervalMs); + replyEvent = findLatestReplyEvent(options.workspacePath, conversationPath, correlationId); + } + const waitedMs = Date.now() - startedAt; + return collaborationOkResult('wg_ask', actor, { + operation: postResult.replayed ? 'replayed' : 'created', + status: replyEvent ? 'answered' : 'pending', + timed_out: shouldAwaitReply && !replyEvent && waitedMs >= timeoutMs, + waited_ms: waitedMs, + thread_path: threadPath, + conversation_path: conversationPath, + correlation_id: correlationId, + idempotency: { + key: idempotencyKey, + replayed: postResult.replayed, + }, + ask: serializeEvent(postResult.askEvent), + reply: replyEvent ? serializeEvent(replyEvent) : null, + }); + } catch (error) { + return collaborationErrorResult('wg_ask', error); + } + }, + ); + + server.registerTool( + 'wg_spawn_thread', + { + title: 'WorkGraph Spawn Thread', + description: 'Create a child thread with inherited context and optional idempotency key.', + inputSchema: { + parentThreadPath: z.string().min(1).describe('Parent thread path for child spawn operation.'), + actor: z.string().optional().describe('Actor identity for write attribution.'), + title: z.string().min(1).describe('New child thread title.'), + goal: z.string().min(1).describe('New child thread goal/body seed.'), + priority: z.string().optional().describe('Optional child priority override.'), + deps: z.array(z.string()).optional().describe('Optional dependency thread refs.'), + tags: z.array(z.string()).optional().describe('Optional child tags.'), + contextRefs: z.array(z.string()).optional().describe('Additional context refs inherited by child thread.'), + space: z.string().optional().describe('Optional space override for the spawned child thread.'), + conversationPath: z.string().optional().describe('Optional conversation to attach spawned child thread.'), + idempotencyKey: z.string().optional().describe('Stable idempotency key for retry-safe spawn.'), + }, + annotations: { + destructiveHint: true, + idempotentHint: false, + }, + }, + async (args) => { + try { + const actor = resolveActor(options.workspacePath, args.actor, options.defaultActor); + assertWriteAllowed(options, actor, ['thread:create', 'mcp:write'], { + action: 'mcp.collaboration.spawn-thread', + target: normalizeThreadPath(args.parentThreadPath), + }); + const parentThreadPath = assertThreadExists(options.workspacePath, args.parentThreadPath); + const parentThread = store.read(options.workspacePath, parentThreadPath)!; + const idempotencyKey = normalizeOptionalString(args.idempotencyKey); + if (idempotencyKey) { + const existing = findSpawnedThreadByKey(options.workspacePath, parentThreadPath, idempotencyKey); + if (existing) { + assertSpawnReplayCompatible(existing, args.title, args.goal); + return collaborationOkResult('wg_spawn_thread', actor, { + operation: 'replayed', + parent_thread_path: parentThreadPath, + idempotency: { + key: idempotencyKey, + replayed: true, + }, + thread: serializeThread(existing), + }); + } + } + const inheritedContextRefs = dedupeStrings([ + ...asStringArray(parentThread.fields.context_refs), + parentThreadPath, + ...(args.contextRefs ?? []), + ]); + const created = thread.createThread(options.workspacePath, args.title, args.goal, actor, { + parent: parentThreadPath, + priority: args.priority, + deps: args.deps, + space: normalizeOptionalString(args.space) ?? normalizeOptionalString(parentThread.fields.space), + context_refs: inheritedContextRefs, + tags: args.tags, + }); + const withMetadata = store.update( + options.workspacePath, + created.path, + { + mcp_spawn_parent: parentThreadPath, + mcp_spawned_by: actor, + mcp_spawned_at: new Date().toISOString(), + ...(idempotencyKey ? { mcp_spawn_idempotency_key: idempotencyKey } : {}), + }, + undefined, + actor, + { + skipAuthorization: true, + action: 'mcp.collaboration.spawn.store', + requiredCapabilities: ['thread:create', 'thread:manage'], + }, + ); + if (args.conversationPath) { + conversation.attachConversationThread( + options.workspacePath, + args.conversationPath, + withMetadata.path, + actor, + ); + } + return collaborationOkResult('wg_spawn_thread', actor, { + operation: 'created', + parent_thread_path: parentThreadPath, + idempotency: { + key: idempotencyKey, + replayed: false, + }, + thread: serializeThread(withMetadata), + }); + } catch (error) { + return collaborationErrorResult('wg_spawn_thread', error); + } + }, + ); + + server.registerTool( + 'wg_heartbeat', + { + title: 'WorkGraph Heartbeat', + description: 'Write agent liveness plus active-work claim heartbeat updates.', + inputSchema: { + actor: z.string().optional().describe('Actor identity to heartbeat.'), + threadPath: z.string().optional().describe('Optional specific thread to heartbeat claim lease for.'), + threadLeaseMinutes: z.number().int().min(1).max(240).optional().describe('Thread lease extension window in minutes.'), + status: z.enum(PRESENCE_STATUSES).optional().describe('Presence status update for actor liveness.'), + currentWork: z.string().optional().describe('Current work/thread marker for presence state.'), + capabilities: z.array(z.string()).optional().describe('Optional runtime capabilities snapshot for presence.'), + }, + annotations: { + destructiveHint: true, + idempotentHint: false, + }, + }, + async (args) => { + try { + const actor = resolveActor(options.workspacePath, args.actor, options.defaultActor); + assertWriteAllowed(options, actor, ['agent:heartbeat', 'thread:update', 'mcp:write'], { + action: 'mcp.collaboration.heartbeat', + target: normalizeOptionalString(args.threadPath) ?? 'threads', + }); + const threadPath = args.threadPath + ? assertThreadExists(options.workspacePath, args.threadPath) + : undefined; + const presence = agent.heartbeat(options.workspacePath, actor, { + status: args.status, + currentTask: normalizeOptionalString(args.currentWork) ?? threadPath, + capabilities: args.capabilities, + actor, + }); + const threadHeartbeat = thread.heartbeatClaim(options.workspacePath, actor, threadPath, { + ttlMinutes: args.threadLeaseMinutes, + }); + return collaborationOkResult('wg_heartbeat', actor, { + operation: 'updated', + actor, + thread_path: threadPath ?? null, + presence: { + path: presence.path, + status: String(presence.fields.status ?? 'unknown'), + current_task: normalizeOptionalString(presence.fields.current_task) ?? null, + last_seen: normalizeOptionalString(presence.fields.last_seen) ?? null, + }, + threads: threadHeartbeat, + }); + } catch (error) { + return collaborationErrorResult('wg_heartbeat', error); + } + }, + ); +} + +function ensureAskEvent( + workspacePath: string, + input: { + actor: string; + threadPath: string; + conversationPath: string; + question: string; + idempotencyKey?: string; + correlationId: string; + evidence?: Array>; + metadata?: Record; + }, +): { + replayed: boolean; + correlationId: string; + askEvent: conversationModule.ConversationEventRecord; +} { + const events = conversation.listConversationEvents(workspacePath, input.conversationPath); + const fromKey = input.idempotencyKey + ? events.find((event) => event.idempotency_key === input.idempotencyKey && event.event_type === 'ask') + : undefined; + if (fromKey) { + assertAskReplayCompatible(fromKey, input.threadPath, input.question, input.correlationId); + return { + replayed: true, + correlationId: fromKey.correlation_id ?? input.correlationId, + askEvent: fromKey, + }; + } + const fromCorrelation = events.find((event) => event.event_type === 'ask' && event.correlation_id === input.correlationId); + if (fromCorrelation) { + assertAskReplayCompatible(fromCorrelation, input.threadPath, input.question, input.correlationId); + return { + replayed: true, + correlationId: input.correlationId, + askEvent: fromCorrelation, + }; + } + const eventId = mintEventId('ask', { + threadPath: input.threadPath, + actor: input.actor, + messageType: 'ask', + correlationId: input.correlationId, + idempotencyKey: input.idempotencyKey, + message: input.question, + }); + const appended = conversation.appendConversationMessage( + workspacePath, + input.conversationPath, + input.actor, + input.question, + { + kind: 'ask', + eventType: 'ask', + eventId, + threadRef: input.threadPath, + correlationId: input.correlationId, + idempotencyKey: input.idempotencyKey, + evidence: input.evidence, + metadata: input.metadata, + }, + ); + const askEvent = findEventById(workspacePath, appended.conversation.path, eventId) + ?? lastConversationEvent(workspacePath, appended.conversation.path); + if (!askEvent) { + throw new McpToolError('INTERNAL_ERROR', 'Ask event was not found after write completion.'); + } + return { + replayed: false, + correlationId: input.correlationId, + askEvent, + }; +} + +function findLatestReplyEvent( + workspacePath: string, + conversationPath: string, + correlationId: string, +): conversationModule.ConversationEventRecord | null { + const matches = conversation + .listConversationEvents(workspacePath, conversationPath) + .filter((event) => event.event_type === 'reply' && ( + event.reply_to === correlationId || event.correlation_id === correlationId + )); + if (matches.length === 0) return null; + return matches.sort((left, right) => left.ts.localeCompare(right.ts)).at(-1) ?? null; +} + +function assertWriteAllowed( + options: WorkgraphMcpServerOptions, + actor: string, + capabilities: string[], + context: { action: string; target: string }, +): void { + const gate = checkWriteGate(options, actor, capabilities, { + action: context.action, + target: context.target, + }); + if (gate.allowed) return; + const reason = gate.reason ?? 'Policy gate blocked MCP write.'; + if (reason.includes('read-only')) { + throw new McpToolError('READ_ONLY', reason); + } + throw new McpToolError('POLICY_DENIED', reason); +} + +function assertThreadExists(workspacePath: string, rawThreadPath: string): string { + const threadPath = normalizeThreadPath(rawThreadPath); + const resolved = store.read(workspacePath, threadPath); + if (!resolved || resolved.type !== 'thread') { + throw new McpToolError('NOT_FOUND', `Thread not found: ${threadPath}`); + } + return threadPath; +} + +function resolveConversationPath( + workspacePath: string, + actor: string, + threadPath: string, + explicitPath?: string, +): string { + if (explicitPath) { + const selected = conversation.getConversation(workspacePath, explicitPath); + if (!selected.summary.threadRefs.includes(threadPath)) { + return conversation.attachConversationThread(workspacePath, explicitPath, threadPath, actor).conversation.path; + } + return selected.conversation.path; + } + const candidates = conversation.listConversations(workspacePath, { threadRef: threadPath }); + if (candidates.length > 0) { + const latest = [...candidates].sort((left, right) => { + const leftUpdated = normalizeOptionalString(left.conversation.fields.updated) ?? ''; + const rightUpdated = normalizeOptionalString(right.conversation.fields.updated) ?? ''; + return rightUpdated.localeCompare(leftUpdated); + })[0]; + return latest.conversation.path; + } + const threadInstance = store.read(workspacePath, threadPath); + const title = normalizeOptionalString(threadInstance?.fields.title) ?? threadPath; + const created = conversation.createConversation( + workspacePath, + `Coordination: ${title}`, + actor, + { + status: 'active', + threadRefs: [threadPath], + owner: actor, + }, + ); + return created.conversation.path; +} + +function findEventById( + workspacePath: string, + conversationPath: string, + eventId: string, +): conversationModule.ConversationEventRecord | null { + return conversation + .listConversationEvents(workspacePath, conversationPath) + .find((event) => event.id === eventId) ?? null; +} + +function lastConversationEvent( + workspacePath: string, + conversationPath: string, +): conversationModule.ConversationEventRecord | null { + const events = conversation.listConversationEvents(workspacePath, conversationPath); + return events.length > 0 ? events[events.length - 1] : null; +} + +function findEventByIdempotencyKey( + workspacePath: string, + conversationPath: string, + idempotencyKey: string, +): conversationModule.ConversationEventRecord | null { + return conversation + .listConversationEvents(workspacePath, conversationPath) + .find((event) => event.idempotency_key === idempotencyKey) ?? null; +} + +function findSpawnedThreadByKey( + workspacePath: string, + parentThreadPath: string, + idempotencyKey: string, +) { + return store.list(workspacePath, 'thread').find((entry) => + normalizeOptionalString(entry.fields.parent) === parentThreadPath && + normalizeOptionalString(entry.fields.mcp_spawn_idempotency_key) === idempotencyKey + ) ?? null; +} + +function assertPostReplayCompatible( + existing: conversationModule.ConversationEventRecord, + input: { + threadPath: string; + body: string; + messageType: string; + correlationId?: string; + replyToCorrelationId?: string; + evidence?: Array>; + metadata?: Record; + }, +): void { + const expectedEvidence = stableStringify(input.evidence ?? []); + const actualEvidence = stableStringify(existing.evidence ?? []); + const expectedMetadata = stableStringify(input.metadata ?? {}); + const actualMetadata = stableStringify(existing.metadata ?? {}); + if ( + existing.message !== input.body || + (existing.event_type ?? existing.kind) !== input.messageType || + normalizeOptionalString(existing.thread_ref) !== input.threadPath || + normalizeOptionalString(existing.correlation_id) !== input.correlationId || + normalizeOptionalString(existing.reply_to) !== input.replyToCorrelationId || + expectedEvidence !== actualEvidence || + expectedMetadata !== actualMetadata + ) { + throw new McpToolError( + 'IDEMPOTENCY_CONFLICT', + `Idempotency key "${existing.idempotency_key}" was previously used with a different message payload.`, + { + details: { + previous_event_id: existing.id ?? null, + }, + }, + ); + } +} + +function assertAskReplayCompatible( + existing: conversationModule.ConversationEventRecord, + threadPath: string, + question: string, + correlationId: string, +): void { + if ( + existing.message !== question || + normalizeOptionalString(existing.thread_ref) !== threadPath || + normalizeOptionalString(existing.correlation_id) !== correlationId + ) { + throw new McpToolError( + 'IDEMPOTENCY_CONFLICT', + `Idempotent ask replay conflict for correlation "${correlationId}".`, + { + details: { + previous_event_id: existing.id ?? null, + }, + }, + ); + } +} + +function assertSpawnReplayCompatible(existing: { fields: Record }, title: string, goal: string): void { + const existingTitle = normalizeOptionalString(existing.fields.title); + const existingGoal = normalizeOptionalString(existing.fields.goal); + if (existingTitle !== title || existingGoal !== goal) { + throw new McpToolError( + 'IDEMPOTENCY_CONFLICT', + 'Spawn idempotency key was reused with different child-thread payload.', + { + details: { + previous_title: existingTitle ?? null, + previous_goal: existingGoal ?? null, + }, + }, + ); + } +} + +function serializeEvent(event: conversationModule.ConversationEventRecord) { + return { + id: event.id ?? null, + ts: event.ts, + actor: event.actor, + kind: event.kind, + event_type: event.event_type ?? event.kind, + message: event.message, + thread_ref: event.thread_ref ?? null, + correlation_id: event.correlation_id ?? null, + reply_to: event.reply_to ?? null, + idempotency_key: event.idempotency_key ?? null, + evidence: event.evidence ?? [], + metadata: event.metadata ?? {}, + }; +} + +function serializeThread(entry: { path: string; fields: Record }) { + return { + path: entry.path, + title: normalizeOptionalString(entry.fields.title) ?? entry.path, + goal: normalizeOptionalString(entry.fields.goal) ?? '', + status: normalizeOptionalString(entry.fields.status) ?? 'unknown', + owner: normalizeOptionalString(entry.fields.owner) ?? null, + parent: normalizeOptionalString(entry.fields.parent) ?? null, + space: normalizeOptionalString(entry.fields.space) ?? null, + context_refs: asStringArray(entry.fields.context_refs), + deps: asStringArray(entry.fields.deps), + tags: asStringArray(entry.fields.tags), + updated: normalizeOptionalString(entry.fields.updated) ?? null, + }; +} + +function normalizeThreadPath(value: string): string { + const trimmed = String(value ?? '').trim().replace(/^\.\//, '').replace(/\\/g, '/'); + if (!trimmed) { + throw new McpToolError('BAD_INPUT', 'Thread path is required.'); + } + const withPrefix = trimmed.includes('/') ? trimmed : `threads/${trimmed}`; + const withExtension = withPrefix.endsWith('.md') ? withPrefix : `${withPrefix}.md`; + return withExtension; +} + +function normalizeOptionalString(value: unknown): string | undefined { + if (typeof value !== 'string') return undefined; + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : undefined; +} + +function asStringArray(value: unknown): string[] { + if (!Array.isArray(value)) return []; + return value + .map((item) => normalizeOptionalString(item)) + .filter((item): item is string => !!item); +} + +function dedupeStrings(values: string[]): string[] { + return [...new Set(values.map((value) => value.trim()).filter((value) => value.length > 0))]; +} + +function mintEventId( + prefix: 'msg' | 'ask', + seed: { + threadPath: string; + actor: string; + messageType: string; + correlationId?: string; + idempotencyKey?: string; + message: string; + }, +): string { + const raw = stableStringify({ + prefix, + ...seed, + nonce: randomUUID(), + }); + return `${prefix}_${createHash('sha1').update(raw).digest('hex').slice(0, 16)}`; +} + +function correlationIdFromIdempotency(idempotencyKey: string): string { + return `corr_${createHash('sha1').update(idempotencyKey).digest('hex').slice(0, 12)}`; +} + +function correlationIdFromQuestion(threadPath: string, question: string): string { + const raw = `${threadPath}|${question}|${new Date().toISOString()}|${randomUUID()}`; + return `corr_${createHash('sha1').update(raw).digest('hex').slice(0, 12)}`; +} + +function stableStringify(value: unknown): string { + if (value === null || typeof value !== 'object') { + return JSON.stringify(value); + } + if (Array.isArray(value)) { + return `[${value.map((item) => stableStringify(item)).join(',')}]`; + } + const record = value as Record; + return `{${Object.keys(record) + .sort((left, right) => left.localeCompare(right)) + .map((key) => `${JSON.stringify(key)}:${stableStringify(record[key])}`) + .join(',')}}`; +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +}