diff --git a/.docs/subagent-worktrees.md b/.docs/subagent-worktrees.md new file mode 100644 index 0000000000..7c1808d3cc --- /dev/null +++ b/.docs/subagent-worktrees.md @@ -0,0 +1,125 @@ +# Sub-Agent Worktrees + +## Summary + +T3 Code now supports specialist sub-agents that run in isolated git worktrees and return only a distilled report to the parent thread. + +The current implementation is Codex-first and uses external Codex skills from: + +- `CODEX_HOME/skills` +- fallback: `~/.codex/skills` + +## User Flow + +1. A user runs `/skill `. +2. The server creates a `SubagentRun` on the parent thread with status `preparing`. +3. `SubagentCoordinator` creates a new git worktree and branch for the run. +4. The server creates a hidden orchestration thread with: + - `threadKind = "subagent"` + - `parentThreadId =
` +5. The hidden thread starts a normal Codex turn with skill prompt injected through `developerInstructions`. +6. When the hidden turn quiesces, the server synthesizes a structured report from the hidden assistant output and checkpoint data. +7. The parent thread shows a specialist report card instead of the hidden conversation history. + +## Important Behavior + +- Hidden sub-agent threads do not appear in the normal thread list. +- `Open worktree thread` creates a normal visible thread on the retained sub-agent worktree. +- `Use report` inserts the distilled report into the composer only. It does not auto-send. +- `Discard` cleans up the sub-agent worktree and branch. +- If a visible thread was opened on that worktree, cleanup now detaches it back to a normal local thread so it does not keep a dead `cwd`. +- Generic thread deletion also treats already-missing worktrees as a safe no-op. + +## Current Skill Source + +Skill discovery is server-side and reads `SKILL.md` files from the external Codex skill home. + +V1 assumptions: + +- no repo-local `.agents/skills` +- no VS Code extension internals +- no `tools.json` +- no `report-schema.md` + +The only required skill artifact is `SKILL.md`. + +## Key Files + +### Contracts + +- `packages/contracts/src/subagent.ts` +- `packages/contracts/src/orchestration.ts` +- `packages/contracts/src/provider.ts` +- `packages/contracts/src/server.ts` + +### Server + +- `apps/server/src/subagents/Layers/SkillCatalog.ts` +- `apps/server/src/subagents/Layers/SubagentCoordinator.ts` +- `apps/server/src/wsServer.ts` +- `apps/server/src/codexAppServerManager.ts` +- `apps/server/src/provider/Layers/CodexAdapter.ts` +- `apps/server/src/orchestration/Layers/ProjectionPipeline.ts` +- `apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts` +- `apps/server/src/git/Layers/GitCore.ts` + +### Persistence + +- `apps/server/src/persistence/Migrations/014_ProjectionThreadSubagentRuns.ts` +- `apps/server/src/persistence/Layers/ProjectionThreadSubagentRuns.ts` +- `apps/server/src/persistence/Services/ProjectionThreadSubagentRuns.ts` + +### Web + +- `apps/web/src/composer-logic.ts` +- `apps/web/src/components/chat/ComposerCommandMenu.tsx` +- `apps/web/src/components/chat/MessagesTimeline.tsx` +- `apps/web/src/components/chat/SubagentReportCard.tsx` +- `apps/web/src/components/ChatView.tsx` +- `apps/web/src/session-logic.ts` + +## Status Model + +`SubagentRun.status` currently uses: + +- `preparing` +- `running` +- `report_ready` +- `accepted` +- `retained` +- `cleaned_up` +- `failed` +- `cleanup_failed` + +## Cleanup Rules + +- Clean worktree after report acceptance: can be auto-cleaned. +- Dirty worktree after report acceptance: retained until explicit discard. +- If cleanup runs after a visible worktree thread was opened, that visible thread is detached from the removed worktree. +- If sidebar thread deletion tries to remove a worktree path that is already gone, `GitCore.removeWorktree` now treats that as a no-op. + +## Known Design Choices + +- The hidden sub-agent conversation remains server-side state and is not replayed into the main thread. +- The visible thread opened from a sub-agent worktree starts with normal visible-thread history, not the hidden sub-agent transcript. +- The parent thread only receives the distilled report card. +- The server uses turn quiescence and orchestration projections, not file watching, to finalize reports. + +## If You Want To Extend This + +Common next steps: + +- improve report synthesis quality and strictness +- support richer skill metadata beyond `SKILL.md` +- add branch merge/apply UX for retained sub-agent worktrees +- add explicit UI state for detached former worktree threads +- support providers beyond Codex + +## Verification + +This implementation was verified with: + +- `bun fmt` +- `bun lint` +- `bun typecheck` +- `bun run test` diff --git a/apps/server/integration/OrchestrationEngineHarness.integration.ts b/apps/server/integration/OrchestrationEngineHarness.integration.ts index c5eb125aba..eec99ee20b 100644 --- a/apps/server/integration/OrchestrationEngineHarness.integration.ts +++ b/apps/server/integration/OrchestrationEngineHarness.integration.ts @@ -60,6 +60,7 @@ import { RuntimeReceiptBus, type OrchestrationRuntimeReceipt, } from "../src/orchestration/Services/RuntimeReceiptBus.ts"; +import { SubagentCoordinator } from "../src/subagents/Services/SubagentCoordinator.ts"; import { makeTestProviderAdapterHarness, @@ -292,6 +293,7 @@ export const makeOrchestrationIntegrationHarness = ( const gitCoreLayer = Layer.succeed(GitCore, { renameBranch: (input: Parameters[0]) => Effect.succeed({ branch: input.newBranch }), + deleteLocalBranch: () => Effect.void, } as unknown as GitCoreShape); const textGenerationLayer = Layer.succeed(TextGeneration, { generateBranchName: () => Effect.succeed({ branch: null }), @@ -308,6 +310,11 @@ export const makeOrchestrationIntegrationHarness = ( Layer.provideMerge(runtimeIngestionLayer), Layer.provideMerge(providerCommandReactorLayer), Layer.provideMerge(checkpointReactorLayer), + Layer.provideMerge( + Layer.succeed(SubagentCoordinator, { + start: Effect.void, + }), + ), ); const layer = orchestrationReactorLayer.pipe( Layer.provide(persistenceLayer), diff --git a/apps/server/src/codexAppServerManager.ts b/apps/server/src/codexAppServerManager.ts index a8a8ce4607..ed9060a726 100644 --- a/apps/server/src/codexAppServerManager.ts +++ b/apps/server/src/codexAppServerManager.ts @@ -121,6 +121,7 @@ export interface CodexAppServerSendTurnInput { readonly serviceTier?: string | null; readonly effort?: string; readonly interactionMode?: ProviderInteractionMode; + readonly developerInstructions?: string; } export interface CodexAppServerStartSessionInput { @@ -418,6 +419,7 @@ function buildCodexCollaborationMode(input: { readonly interactionMode?: "default" | "plan"; readonly model?: string; readonly effort?: string; + readonly developerInstructions?: string; }): | { mode: "default" | "plan"; @@ -428,19 +430,21 @@ function buildCodexCollaborationMode(input: { }; } | undefined { - if (input.interactionMode === undefined) { + if (input.interactionMode === undefined && input.developerInstructions === undefined) { return undefined; } + const mode = input.interactionMode ?? "default"; const model = normalizeCodexModelSlug(input.model) ?? "gpt-5.3-codex"; return { - mode: input.interactionMode, + mode, settings: { model, reasoning_effort: input.effort ?? "medium", developer_instructions: - input.interactionMode === "plan" + input.developerInstructions ?? + (mode === "plan" ? CODEX_PLAN_MODE_DEVELOPER_INSTRUCTIONS - : CODEX_DEFAULT_MODE_DEVELOPER_INSTRUCTIONS, + : CODEX_DEFAULT_MODE_DEVELOPER_INSTRUCTIONS), }, }; } @@ -800,6 +804,9 @@ export class CodexAppServerManager extends EventEmitter fetchPullRequestBranch: (input) => core.fetchPullRequestBranch(input), ensureRemote: (input) => core.ensureRemote(input), fetchRemoteBranch: (input) => core.fetchRemoteBranch(input), + deleteLocalBranch: (input) => core.deleteLocalBranch(input), setBranchUpstream: (input) => core.setBranchUpstream(input), removeWorktree: (input) => core.removeWorktree(input), renameBranch: (input) => core.renameBranch(input), @@ -1633,24 +1634,14 @@ it.layer(TestLayer)("git integration", (it) => { }), ); - it.effect("includes command context when worktree removal fails", () => + it.effect("treats removing an already-missing worktree as a no-op", () => Effect.gen(function* () { const tmp = yield* makeTmpDir(); yield* initRepoWithCommit(tmp); const core = yield* GitCore; const missingWorktreePath = path.join(tmp, "missing-worktree"); - const removeResult = yield* Effect.result( - core.removeWorktree({ cwd: tmp, path: missingWorktreePath }), - ); - expect(removeResult._tag).toBe("Failure"); - if (removeResult._tag !== "Failure") { - return; - } - const message = removeResult.failure.message; - expect(message).toContain("git worktree remove"); - expect(message).toContain(`cwd: ${tmp}`); - expect(message).toContain(missingWorktreePath); + yield* core.removeWorktree({ cwd: tmp, path: missingWorktreePath }); }), ); diff --git a/apps/server/src/git/Layers/GitCore.ts b/apps/server/src/git/Layers/GitCore.ts index f5b9168abb..ac3375f501 100644 --- a/apps/server/src/git/Layers/GitCore.ts +++ b/apps/server/src/git/Layers/GitCore.ts @@ -1253,26 +1253,45 @@ const makeGitCore = Effect.gen(function* () { input.branch, ]); + const deleteLocalBranch: GitCoreShape["deleteLocalBranch"] = (input) => + runGit("GitCore.deleteLocalBranch", input.cwd, [ + "branch", + input.force ? "-D" : "-d", + input.branch, + ]); + const removeWorktree: GitCoreShape["removeWorktree"] = (input) => Effect.gen(function* () { + const worktreeExists = yield* fileSystem + .exists(input.path) + .pipe(Effect.orElseSucceed(() => false)); + if (!worktreeExists) { + return; + } const args = ["worktree", "remove"]; if (input.force) { args.push("--force"); } args.push(input.path); - yield* executeGit("GitCore.removeWorktree", input.cwd, args, { - timeoutMs: 15_000, - fallbackErrorMessage: "git worktree remove failed", - }).pipe( - Effect.mapError((error) => - createGitCommandError( - "GitCore.removeWorktree", - input.cwd, - args, - `${commandLabel(args)} failed (cwd: ${input.cwd}): ${error instanceof Error ? error.message : String(error)}`, - error, - ), - ), + const result = yield* Effect.result( + executeGit("GitCore.removeWorktree", input.cwd, args, { + timeoutMs: 15_000, + fallbackErrorMessage: "git worktree remove failed", + }), + ); + if (result._tag === "Success") { + return; + } + const message = result.failure.message.toLowerCase(); + if (message.includes("is not a working tree")) { + return; + } + return yield* createGitCommandError( + "GitCore.removeWorktree", + input.cwd, + args, + `${commandLabel(args)} failed (cwd: ${input.cwd}): ${result.failure instanceof Error ? result.failure.message : String(result.failure)}`, + result.failure, ); }); @@ -1415,6 +1434,7 @@ const makeGitCore = Effect.gen(function* () { fetchPullRequestBranch, ensureRemote, fetchRemoteBranch, + deleteLocalBranch, setBranchUpstream, removeWorktree, renameBranch, diff --git a/apps/server/src/git/Services/GitCore.ts b/apps/server/src/git/Services/GitCore.ts index 879927934e..f08843a366 100644 --- a/apps/server/src/git/Services/GitCore.ts +++ b/apps/server/src/git/Services/GitCore.ts @@ -75,6 +75,12 @@ export interface GitFetchRemoteBranchInput { localBranch: string; } +export interface GitDeleteLocalBranchInput { + cwd: string; + branch: string; + force?: boolean; +} + export interface GitSetBranchUpstreamInput { cwd: string; branch: string; @@ -175,6 +181,13 @@ export interface GitCoreShape { input: GitFetchRemoteBranchInput, ) => Effect.Effect; + /** + * Delete a local branch. + */ + readonly deleteLocalBranch: ( + input: GitDeleteLocalBranchInput, + ) => Effect.Effect; + /** * Set the upstream tracking branch for a local branch. */ diff --git a/apps/server/src/orchestration/Layers/OrchestrationReactor.test.ts b/apps/server/src/orchestration/Layers/OrchestrationReactor.test.ts index 1514bef595..c901d253a5 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationReactor.test.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationReactor.test.ts @@ -6,6 +6,7 @@ import { ProviderCommandReactor } from "../Services/ProviderCommandReactor.ts"; import { ProviderRuntimeIngestionService } from "../Services/ProviderRuntimeIngestion.ts"; import { OrchestrationReactor } from "../Services/OrchestrationReactor.ts"; import { makeOrchestrationReactor } from "./OrchestrationReactor.ts"; +import { SubagentCoordinator } from "../../subagents/Services/SubagentCoordinator.ts"; describe("OrchestrationReactor", () => { let runtime: ManagedRuntime.ManagedRuntime | null = null; @@ -46,6 +47,13 @@ describe("OrchestrationReactor", () => { drain: Effect.void, }), ), + Layer.provideMerge( + Layer.succeed(SubagentCoordinator, { + start: Effect.sync(() => { + started.push("subagent-coordinator"); + }), + }), + ), ), ); @@ -57,6 +65,7 @@ describe("OrchestrationReactor", () => { "provider-runtime-ingestion", "provider-command-reactor", "checkpoint-reactor", + "subagent-coordinator", ]); await Effect.runPromise(Scope.close(scope, Exit.void)); diff --git a/apps/server/src/orchestration/Layers/OrchestrationReactor.ts b/apps/server/src/orchestration/Layers/OrchestrationReactor.ts index 1e498885a0..c0dac3639c 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationReactor.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationReactor.ts @@ -7,16 +7,19 @@ import { import { CheckpointReactor } from "../Services/CheckpointReactor.ts"; import { ProviderCommandReactor } from "../Services/ProviderCommandReactor.ts"; import { ProviderRuntimeIngestionService } from "../Services/ProviderRuntimeIngestion.ts"; +import { SubagentCoordinator } from "../../subagents/Services/SubagentCoordinator.ts"; export const makeOrchestrationReactor = Effect.gen(function* () { const providerRuntimeIngestion = yield* ProviderRuntimeIngestionService; const providerCommandReactor = yield* ProviderCommandReactor; const checkpointReactor = yield* CheckpointReactor; + const subagentCoordinator = yield* SubagentCoordinator; const start: OrchestrationReactorShape["start"] = Effect.gen(function* () { yield* providerRuntimeIngestion.start; yield* providerCommandReactor.start; yield* checkpointReactor.start; + yield* subagentCoordinator.start; }); return { diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index 6ae94105a6..ceb48a1136 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -23,6 +23,7 @@ import { ProjectionThreadProposedPlanRepository, } from "../../persistence/Services/ProjectionThreadProposedPlans.ts"; import { ProjectionThreadSessionRepository } from "../../persistence/Services/ProjectionThreadSessions.ts"; +import { ProjectionThreadSubagentRunRepository } from "../../persistence/Services/ProjectionThreadSubagentRuns.ts"; import { type ProjectionTurn, ProjectionTurnRepository, @@ -35,6 +36,7 @@ import { ProjectionThreadActivityRepositoryLive } from "../../persistence/Layers import { ProjectionThreadMessageRepositoryLive } from "../../persistence/Layers/ProjectionThreadMessages.ts"; import { ProjectionThreadProposedPlanRepositoryLive } from "../../persistence/Layers/ProjectionThreadProposedPlans.ts"; import { ProjectionThreadSessionRepositoryLive } from "../../persistence/Layers/ProjectionThreadSessions.ts"; +import { ProjectionThreadSubagentRunRepositoryLive } from "../../persistence/Layers/ProjectionThreadSubagentRuns.ts"; import { ProjectionTurnRepositoryLive } from "../../persistence/Layers/ProjectionTurns.ts"; import { ProjectionThreadRepositoryLive } from "../../persistence/Layers/ProjectionThreads.ts"; import { ServerConfig } from "../../config.ts"; @@ -56,6 +58,7 @@ export const ORCHESTRATION_PROJECTOR_NAMES = { threadProposedPlans: "projection.thread-proposed-plans", threadActivities: "projection.thread-activities", threadSessions: "projection.thread-sessions", + threadSubagentRuns: "projection.thread-subagent-runs", threadTurns: "projection.thread-turns", checkpoints: "projection.checkpoints", pendingApprovals: "projection.pending-approvals", @@ -347,6 +350,7 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { const projectionThreadProposedPlanRepository = yield* ProjectionThreadProposedPlanRepository; const projectionThreadActivityRepository = yield* ProjectionThreadActivityRepository; const projectionThreadSessionRepository = yield* ProjectionThreadSessionRepository; + const projectionThreadSubagentRunRepository = yield* ProjectionThreadSubagentRunRepository; const projectionTurnRepository = yield* ProjectionTurnRepository; const projectionPendingApprovalRepository = yield* ProjectionPendingApprovalRepository; @@ -423,6 +427,8 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { model: event.payload.model, runtimeMode: event.payload.runtimeMode, interactionMode: event.payload.interactionMode, + threadKind: event.payload.threadKind ?? "primary", + parentThreadId: event.payload.parentThreadId ?? null, branch: event.payload.branch, worktreePath: event.payload.worktreePath, latestTurnId: null, @@ -500,7 +506,8 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { case "thread.message-sent": case "thread.proposed-plan-upserted": - case "thread.activity-appended": { + case "thread.activity-appended": + case "thread.subagent-upserted": { const existingRow = yield* projectionThreadRepository.getById({ threadId: event.payload.threadId, }); @@ -564,6 +571,30 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { } }); + const applyThreadSubagentRunsProjection: ProjectorDefinition["apply"] = (event) => + Effect.gen(function* () { + if (event.type !== "thread.subagent-upserted") { + return; + } + yield* projectionThreadSubagentRunRepository.upsert({ + runId: event.payload.subagentRun.id, + parentThreadId: event.payload.threadId, + subagentThreadId: event.payload.subagentRun.subagentThreadId, + skillId: event.payload.subagentRun.skillId, + skillTitle: event.payload.subagentRun.skillTitle, + task: event.payload.subagentRun.task, + status: event.payload.subagentRun.status, + branch: event.payload.subagentRun.branch, + worktreePath: event.payload.subagentRun.worktreePath, + report: event.payload.subagentRun.report, + lastError: event.payload.subagentRun.lastError, + createdAt: event.payload.subagentRun.createdAt, + updatedAt: event.payload.subagentRun.updatedAt, + completedAt: event.payload.subagentRun.completedAt, + acceptedAt: event.payload.subagentRun.acceptedAt, + }); + }); + const applyThreadMessagesProjection: ProjectorDefinition["apply"] = ( event, attachmentSideEffects, @@ -1112,6 +1143,10 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { name: ORCHESTRATION_PROJECTOR_NAMES.threadSessions, apply: applyThreadSessionsProjection, }, + { + name: ORCHESTRATION_PROJECTOR_NAMES.threadSubagentRuns, + apply: applyThreadSubagentRunsProjection, + }, { name: ORCHESTRATION_PROJECTOR_NAMES.threadTurns, apply: applyThreadTurnsProjection, @@ -1226,6 +1261,7 @@ export const OrchestrationProjectionPipelineLive = Layer.effect( Layer.provideMerge(ProjectionThreadProposedPlanRepositoryLive), Layer.provideMerge(ProjectionThreadActivityRepositoryLive), Layer.provideMerge(ProjectionThreadSessionRepositoryLive), + Layer.provideMerge(ProjectionThreadSubagentRunRepositoryLive), Layer.provideMerge(ProjectionTurnRepositoryLive), Layer.provideMerge(ProjectionPendingApprovalRepositoryLive), Layer.provideMerge(ProjectionStateRepositoryLive), diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts index fc7db54802..1f95db834e 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts @@ -229,6 +229,8 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => { model: "gpt-5-codex", interactionMode: "default", runtimeMode: "full-access", + threadKind: "primary", + parentThreadId: null, branch: null, worktreePath: null, latestTurn: { @@ -276,6 +278,7 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => { completedAt: "2026-02-24T00:00:08.000Z", }, ], + subagentRuns: [], session: { threadId: ThreadId.makeUnsafe("thread-1"), status: "running", diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts index 5fd38a5401..d2cd18bfac 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts @@ -6,6 +6,7 @@ import { OrchestrationCheckpointFile, OrchestrationReadModel, ProjectScript, + SubagentReport, TurnId, type OrchestrationCheckpointSummary, type OrchestrationLatestTurn, @@ -33,6 +34,7 @@ import { ProjectionThreadActivity } from "../../persistence/Services/ProjectionT import { ProjectionThreadMessage } from "../../persistence/Services/ProjectionThreadMessages.ts"; import { ProjectionThreadProposedPlan } from "../../persistence/Services/ProjectionThreadProposedPlans.ts"; import { ProjectionThreadSession } from "../../persistence/Services/ProjectionThreadSessions.ts"; +import { ProjectionThreadSubagentRun } from "../../persistence/Services/ProjectionThreadSubagentRuns.ts"; import { ProjectionThread } from "../../persistence/Services/ProjectionThreads.ts"; import { ORCHESTRATION_PROJECTOR_NAMES } from "./ProjectionPipeline.ts"; import { @@ -54,6 +56,11 @@ const ProjectionThreadMessageDbRowSchema = ProjectionThreadMessage.mapFields( ); const ProjectionThreadProposedPlanDbRowSchema = ProjectionThreadProposedPlan; const ProjectionThreadDbRowSchema = ProjectionThread; +const ProjectionThreadSubagentRunDbRowSchema = ProjectionThreadSubagentRun.mapFields( + Struct.assign({ + report: Schema.NullOr(Schema.fromJsonString(Schema.NullOr(SubagentReport))), + }), +); const ProjectionThreadActivityDbRowSchema = ProjectionThreadActivity.mapFields( Struct.assign({ payload: Schema.fromJsonString(Schema.Unknown), @@ -84,6 +91,7 @@ const REQUIRED_SNAPSHOT_PROJECTORS = [ ORCHESTRATION_PROJECTOR_NAMES.threadProposedPlans, ORCHESTRATION_PROJECTOR_NAMES.threadActivities, ORCHESTRATION_PROJECTOR_NAMES.threadSessions, + ORCHESTRATION_PROJECTOR_NAMES.threadSubagentRuns, ORCHESTRATION_PROJECTOR_NAMES.checkpoints, ] as const; @@ -159,6 +167,8 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { model, runtime_mode AS "runtimeMode", interaction_mode AS "interactionMode", + thread_kind AS "threadKind", + parent_thread_id AS "parentThreadId", branch, worktree_path AS "worktreePath", latest_turn_id AS "latestTurnId", @@ -170,6 +180,32 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { `, }); + const listThreadSubagentRunRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionThreadSubagentRunDbRowSchema, + execute: () => + sql` + SELECT + run_id AS "runId", + parent_thread_id AS "parentThreadId", + subagent_thread_id AS "subagentThreadId", + skill_id AS "skillId", + skill_title AS "skillTitle", + task, + status, + branch, + worktree_path AS "worktreePath", + report_json AS "report", + last_error AS "lastError", + created_at AS "createdAt", + updated_at AS "updatedAt", + completed_at AS "completedAt", + accepted_at AS "acceptedAt" + FROM projection_thread_subagent_runs + ORDER BY parent_thread_id ASC, created_at ASC, run_id ASC + `, + }); + const listThreadMessageRows = SqlSchema.findAll({ Request: Schema.Void, Result: ProjectionThreadMessageDbRowSchema, @@ -315,6 +351,7 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { proposedPlanRows, activityRows, sessionRows, + subagentRunRows, checkpointRows, latestTurnRows, stateRows, @@ -367,6 +404,14 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { ), ), ), + listThreadSubagentRunRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listThreadSubagentRuns:query", + "ProjectionSnapshotQuery.getSnapshot:listThreadSubagentRuns:decodeRows", + ), + ), + ), listCheckpointRows(undefined).pipe( Effect.mapError( toPersistenceSqlOrDecodeError( @@ -398,6 +443,10 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { const activitiesByThread = new Map>(); const checkpointsByThread = new Map>(); const sessionsByThread = new Map(); + const subagentRunsByThread = new Map< + string, + Array[number]> + >(); const latestTurnByThread = new Map(); let updatedAt: string | null = null; @@ -411,6 +460,9 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { for (const row of stateRows) { updatedAt = maxIso(updatedAt, row.updatedAt); } + for (const row of subagentRunRows) { + updatedAt = maxIso(updatedAt, row.updatedAt); + } for (const row of messageRows) { updatedAt = maxIso(updatedAt, row.updatedAt); @@ -513,6 +565,28 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { }); } + for (const row of subagentRunRows) { + const threadRuns = subagentRunsByThread.get(row.parentThreadId) ?? []; + threadRuns.push({ + id: row.runId, + parentThreadId: row.parentThreadId, + subagentThreadId: row.subagentThreadId, + skillId: row.skillId, + skillTitle: row.skillTitle, + task: row.task, + status: row.status, + branch: row.branch, + worktreePath: row.worktreePath, + report: row.report, + lastError: row.lastError, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + completedAt: row.completedAt, + acceptedAt: row.acceptedAt, + }); + subagentRunsByThread.set(row.parentThreadId, threadRuns); + } + const projects: Array = projectRows.map((row) => ({ id: row.projectId, title: row.title, @@ -524,25 +598,37 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { deletedAt: row.deletedAt, })); - const threads: Array = threadRows.map((row) => ({ - id: row.threadId, - projectId: row.projectId, - title: row.title, - model: row.model, - runtimeMode: row.runtimeMode, - interactionMode: row.interactionMode, - branch: row.branch, - worktreePath: row.worktreePath, - latestTurn: latestTurnByThread.get(row.threadId) ?? null, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - deletedAt: row.deletedAt, - messages: messagesByThread.get(row.threadId) ?? [], - proposedPlans: proposedPlansByThread.get(row.threadId) ?? [], - activities: activitiesByThread.get(row.threadId) ?? [], - checkpoints: checkpointsByThread.get(row.threadId) ?? [], - session: sessionsByThread.get(row.threadId) ?? null, - })); + const threads: Array = threadRows + .filter((row) => (row.threadKind ?? "primary") !== "subagent") + .map((row) => ({ + id: row.threadId, + projectId: row.projectId, + title: row.title, + model: row.model, + runtimeMode: row.runtimeMode, + interactionMode: row.interactionMode, + threadKind: row.threadKind ?? "primary", + parentThreadId: row.parentThreadId ?? null, + branch: row.branch, + worktreePath: row.worktreePath, + latestTurn: latestTurnByThread.get(row.threadId) ?? null, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + deletedAt: row.deletedAt, + messages: messagesByThread.get(row.threadId) ?? [], + proposedPlans: proposedPlansByThread.get(row.threadId) ?? [], + activities: activitiesByThread.get(row.threadId) ?? [], + checkpoints: checkpointsByThread.get(row.threadId) ?? [], + subagentRuns: + subagentRunsByThread + .get(row.threadId) + ?.toSorted( + (left, right) => + left.createdAt.localeCompare(right.createdAt) || + left.id.localeCompare(right.id), + ) ?? [], + session: sessionsByThread.get(row.threadId) ?? null, + })); const snapshot = { snapshotSequence: computeSnapshotSequence(stateRows), diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts index fe02188450..9c4b49cad3 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts @@ -320,6 +320,7 @@ const make = Effect.gen(function* () { readonly modelOptions?: ProviderModelOptions; readonly providerOptions?: ProviderStartOptions; readonly interactionMode?: "default" | "plan"; + readonly developerInstructions?: string; readonly createdAt: string; }) { const thread = yield* resolveThread(input.threadId); @@ -355,6 +356,9 @@ const make = Effect.gen(function* () { ...(modelForTurn !== undefined ? { model: modelForTurn } : {}), ...(input.modelOptions !== undefined ? { modelOptions: input.modelOptions } : {}), ...(input.interactionMode !== undefined ? { interactionMode: input.interactionMode } : {}), + ...(input.developerInstructions !== undefined + ? { developerInstructions: input.developerInstructions } + : {}), }); }); @@ -474,6 +478,9 @@ const make = Effect.gen(function* () { ? { providerOptions: event.payload.providerOptions } : {}), interactionMode: event.payload.interactionMode, + ...(event.payload.developerInstructions !== undefined + ? { developerInstructions: event.payload.developerInstructions } + : {}), createdAt: event.payload.createdAt, }); }); diff --git a/apps/server/src/orchestration/Schemas.ts b/apps/server/src/orchestration/Schemas.ts index c96385cad1..a60e09d947 100644 --- a/apps/server/src/orchestration/Schemas.ts +++ b/apps/server/src/orchestration/Schemas.ts @@ -8,6 +8,10 @@ import { ThreadInteractionModeSetPayload as ContractsThreadInteractionModeSetPayloadSchema, ThreadDeletedPayload as ContractsThreadDeletedPayloadSchema, ThreadMessageSentPayload as ContractsThreadMessageSentPayloadSchema, + ThreadSubagentCleanupRequestedPayload as ContractsThreadSubagentCleanupRequestedPayloadSchema, + ThreadSubagentReportAcceptedPayload as ContractsThreadSubagentReportAcceptedPayloadSchema, + ThreadSubagentStartRequestedPayload as ContractsThreadSubagentStartRequestedPayloadSchema, + ThreadSubagentUpsertedPayload as ContractsThreadSubagentUpsertedPayloadSchema, ThreadProposedPlanUpsertedPayload as ContractsThreadProposedPlanUpsertedPayloadSchema, ThreadSessionSetPayload as ContractsThreadSessionSetPayloadSchema, ThreadTurnDiffCompletedPayload as ContractsThreadTurnDiffCompletedPayloadSchema, @@ -46,3 +50,10 @@ export const ThreadApprovalResponseRequestedPayload = export const ThreadCheckpointRevertRequestedPayload = ContractsThreadCheckpointRevertRequestedPayloadSchema; export const ThreadSessionStopRequestedPayload = ContractsThreadSessionStopRequestedPayloadSchema; +export const ThreadSubagentStartRequestedPayload = + ContractsThreadSubagentStartRequestedPayloadSchema; +export const ThreadSubagentReportAcceptedPayload = + ContractsThreadSubagentReportAcceptedPayloadSchema; +export const ThreadSubagentCleanupRequestedPayload = + ContractsThreadSubagentCleanupRequestedPayloadSchema; +export const ThreadSubagentUpsertedPayload = ContractsThreadSubagentUpsertedPayloadSchema; diff --git a/apps/server/src/orchestration/decider.ts b/apps/server/src/orchestration/decider.ts index eea41a2b35..b8140457eb 100644 --- a/apps/server/src/orchestration/decider.ts +++ b/apps/server/src/orchestration/decider.ts @@ -16,6 +16,12 @@ import { const nowIso = () => new Date().toISOString(); const DEFAULT_ASSISTANT_DELIVERY_MODE = "buffered" as const; +function hasActiveSubagentRun(readModel: OrchestrationReadModel, threadId: string): boolean { + const thread = readModel.threads.find((entry) => entry.id === threadId); + const subagentRuns = thread?.subagentRuns ?? []; + return subagentRuns.some((run) => run.status === "preparing" || run.status === "running"); +} + const defaultMetadata: Omit = { eventId: crypto.randomUUID() as OrchestrationEvent["eventId"], aggregateKind: "thread", @@ -159,6 +165,8 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" model: command.model, runtimeMode: command.runtimeMode, interactionMode: command.interactionMode, + threadKind: command.threadKind ?? "primary", + parentThreadId: command.parentThreadId ?? null, branch: command.branch, worktreePath: command.worktreePath, createdAt: command.createdAt, @@ -312,6 +320,9 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" interactionMode: readModel.threads.find((entry) => entry.id === command.threadId)?.interactionMode ?? command.interactionMode, + ...(command.developerInstructions !== undefined + ? { developerInstructions: command.developerInstructions } + : {}), createdAt: command.createdAt, }, }; @@ -435,6 +446,112 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" }; } + case "thread.subagent.start": { + yield* requireThread({ + readModel, + command, + threadId: command.threadId, + }); + if (hasActiveSubagentRun(readModel, command.threadId)) { + return yield* new OrchestrationCommandInvariantError({ + commandType: command.type, + detail: `Thread '${command.threadId}' already has an active sub-agent run.`, + }); + } + const placeholderRun = { + id: command.runId, + parentThreadId: command.threadId, + subagentThreadId: null, + skillId: command.skillId, + skillTitle: command.skillId, + task: command.task, + status: "preparing" as const, + branch: null, + worktreePath: null, + report: null, + lastError: null, + createdAt: command.createdAt, + updatedAt: command.createdAt, + completedAt: null, + acceptedAt: null, + }; + const upsertedEvent: Omit = { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt: command.createdAt, + commandId: command.commandId, + }), + type: "thread.subagent-upserted", + payload: { + threadId: command.threadId, + subagentRun: placeholderRun, + }, + }; + const requestedEvent: Omit = { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt: command.createdAt, + commandId: command.commandId, + }), + causationEventId: upsertedEvent.eventId, + type: "thread.subagent-start-requested", + payload: { + threadId: command.threadId, + runId: command.runId, + skillId: command.skillId, + task: command.task, + createdAt: command.createdAt, + }, + }; + return [upsertedEvent, requestedEvent]; + } + + case "thread.subagent.acceptReport": { + yield* requireThread({ + readModel, + command, + threadId: command.threadId, + }); + return { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt: command.createdAt, + commandId: command.commandId, + }), + type: "thread.subagent-report-accepted", + payload: { + threadId: command.threadId, + runId: command.runId, + createdAt: command.createdAt, + }, + }; + } + + case "thread.subagent.cleanup": { + yield* requireThread({ + readModel, + command, + threadId: command.threadId, + }); + return { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt: command.createdAt, + commandId: command.commandId, + }), + type: "thread.subagent-cleanup-requested", + payload: { + threadId: command.threadId, + runId: command.runId, + createdAt: command.createdAt, + }, + }; + } + case "thread.session.set": { yield* requireThread({ readModel, @@ -457,6 +574,27 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" }; } + case "thread.subagent.upsert": { + yield* requireThread({ + readModel, + command, + threadId: command.threadId, + }); + return { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt: command.createdAt, + commandId: command.commandId, + }), + type: "thread.subagent-upserted", + payload: { + threadId: command.threadId, + subagentRun: command.subagentRun, + }, + }; + } + case "thread.message.assistant.delta": { yield* requireThread({ readModel, diff --git a/apps/server/src/orchestration/projector.test.ts b/apps/server/src/orchestration/projector.test.ts index 71f5b6bd4b..308eef5c60 100644 --- a/apps/server/src/orchestration/projector.test.ts +++ b/apps/server/src/orchestration/projector.test.ts @@ -76,6 +76,8 @@ describe("orchestration projector", () => { model: "gpt-5-codex", runtimeMode: "full-access", interactionMode: "default", + threadKind: "primary", + parentThreadId: null, branch: null, worktreePath: null, latestTurn: null, @@ -86,6 +88,7 @@ describe("orchestration projector", () => { proposedPlans: [], activities: [], checkpoints: [], + subagentRuns: [], session: null, }, ]); diff --git a/apps/server/src/orchestration/projector.ts b/apps/server/src/orchestration/projector.ts index 015f82a677..680d12f909 100644 --- a/apps/server/src/orchestration/projector.ts +++ b/apps/server/src/orchestration/projector.ts @@ -14,6 +14,7 @@ import { ProjectDeletedPayload, ProjectMetaUpdatedPayload, ThreadActivityAppendedPayload, + ThreadSubagentUpsertedPayload, ThreadCreatedPayload, ThreadDeletedPayload, ThreadInteractionModeSetPayload, @@ -255,6 +256,8 @@ export function projectEvent( model: payload.model, runtimeMode: payload.runtimeMode, interactionMode: payload.interactionMode, + threadKind: payload.threadKind ?? "primary", + parentThreadId: payload.parentThreadId ?? null, branch: payload.branch, worktreePath: payload.worktreePath, latestTurn: null, @@ -262,6 +265,7 @@ export function projectEvent( updatedAt: payload.updatedAt, deletedAt: null, messages: [], + subagentRuns: [], activities: [], checkpoints: [], session: null, @@ -621,6 +625,37 @@ export function projectEvent( }), ); + case "thread.subagent-upserted": + return decodeForEvent( + ThreadSubagentUpsertedPayload, + event.payload, + event.type, + "payload", + ).pipe( + Effect.map((payload) => { + const thread = nextBase.threads.find((entry) => entry.id === payload.threadId); + if (!thread) { + return nextBase; + } + + const subagentRuns = [ + ...(thread.subagentRuns?.filter((run) => run.id !== payload.subagentRun.id) ?? []), + payload.subagentRun, + ].toSorted( + (left, right) => + left.createdAt.localeCompare(right.createdAt) || left.id.localeCompare(right.id), + ); + + return { + ...nextBase, + threads: updateThread(nextBase.threads, payload.threadId, { + subagentRuns, + updatedAt: event.occurredAt, + }), + }; + }), + ); + default: return Effect.succeed(nextBase); } diff --git a/apps/server/src/persistence/Layers/ProjectionThreadSubagentRuns.ts b/apps/server/src/persistence/Layers/ProjectionThreadSubagentRuns.ts new file mode 100644 index 0000000000..ec30b6917d --- /dev/null +++ b/apps/server/src/persistence/Layers/ProjectionThreadSubagentRuns.ts @@ -0,0 +1,128 @@ +import { Effect, Layer } from "effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; +import * as SqlSchema from "effect/unstable/sql/SqlSchema"; +import { Schema, Struct } from "effect"; + +import { toPersistenceSqlError } from "../Errors.ts"; +import { + ListProjectionThreadSubagentRunsInput, + ProjectionThreadSubagentRun, + ProjectionThreadSubagentRunRepository, + type ProjectionThreadSubagentRunRepositoryShape, +} from "../Services/ProjectionThreadSubagentRuns.ts"; + +const ProjectionThreadSubagentRunDbRowSchema = ProjectionThreadSubagentRun.mapFields( + Struct.assign({ + report: Schema.NullOr(Schema.fromJsonString(ProjectionThreadSubagentRun.fields.report)), + }), +); + +const makeProjectionThreadSubagentRunRepository = Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + const upsertProjectionThreadSubagentRunRow = SqlSchema.void({ + Request: ProjectionThreadSubagentRun, + execute: (row) => sql` + INSERT INTO projection_thread_subagent_runs ( + run_id, + parent_thread_id, + subagent_thread_id, + skill_id, + skill_title, + task, + status, + branch, + worktree_path, + report_json, + last_error, + created_at, + updated_at, + completed_at, + accepted_at + ) + VALUES ( + ${row.runId}, + ${row.parentThreadId}, + ${row.subagentThreadId}, + ${row.skillId}, + ${row.skillTitle}, + ${row.task}, + ${row.status}, + ${row.branch}, + ${row.worktreePath}, + ${JSON.stringify(row.report)}, + ${row.lastError}, + ${row.createdAt}, + ${row.updatedAt}, + ${row.completedAt}, + ${row.acceptedAt} + ) + ON CONFLICT (run_id) + DO UPDATE SET + parent_thread_id = excluded.parent_thread_id, + subagent_thread_id = excluded.subagent_thread_id, + skill_id = excluded.skill_id, + skill_title = excluded.skill_title, + task = excluded.task, + status = excluded.status, + branch = excluded.branch, + worktree_path = excluded.worktree_path, + report_json = excluded.report_json, + last_error = excluded.last_error, + created_at = excluded.created_at, + updated_at = excluded.updated_at, + completed_at = excluded.completed_at, + accepted_at = excluded.accepted_at + `, + }); + + const listProjectionThreadSubagentRunRows = SqlSchema.findAll({ + Request: ListProjectionThreadSubagentRunsInput, + Result: ProjectionThreadSubagentRunDbRowSchema, + execute: ({ parentThreadId }) => sql` + SELECT + run_id AS "runId", + parent_thread_id AS "parentThreadId", + subagent_thread_id AS "subagentThreadId", + skill_id AS "skillId", + skill_title AS "skillTitle", + task, + status, + branch, + worktree_path AS "worktreePath", + report_json AS "report", + last_error AS "lastError", + created_at AS "createdAt", + updated_at AS "updatedAt", + completed_at AS "completedAt", + accepted_at AS "acceptedAt" + FROM projection_thread_subagent_runs + WHERE parent_thread_id = ${parentThreadId} + ORDER BY created_at ASC, run_id ASC + `, + }); + + const upsert: ProjectionThreadSubagentRunRepositoryShape["upsert"] = (row) => + upsertProjectionThreadSubagentRunRow(row).pipe( + Effect.mapError(toPersistenceSqlError("ProjectionThreadSubagentRunRepository.upsert:query")), + ); + + const listByParentThreadId: ProjectionThreadSubagentRunRepositoryShape["listByParentThreadId"] = ( + input, + ) => + listProjectionThreadSubagentRunRows(input).pipe( + Effect.mapError( + toPersistenceSqlError("ProjectionThreadSubagentRunRepository.listByParentThreadId:query"), + ), + ); + + return { + upsert, + listByParentThreadId, + } satisfies ProjectionThreadSubagentRunRepositoryShape; +}); + +export const ProjectionThreadSubagentRunRepositoryLive = Layer.effect( + ProjectionThreadSubagentRunRepository, + makeProjectionThreadSubagentRunRepository, +); diff --git a/apps/server/src/persistence/Layers/ProjectionThreads.ts b/apps/server/src/persistence/Layers/ProjectionThreads.ts index 10192697d0..4f71056d6d 100644 --- a/apps/server/src/persistence/Layers/ProjectionThreads.ts +++ b/apps/server/src/persistence/Layers/ProjectionThreads.ts @@ -26,6 +26,8 @@ const makeProjectionThreadRepository = Effect.gen(function* () { model, runtime_mode, interaction_mode, + thread_kind, + parent_thread_id, branch, worktree_path, latest_turn_id, @@ -40,6 +42,8 @@ const makeProjectionThreadRepository = Effect.gen(function* () { ${row.model}, ${row.runtimeMode}, ${row.interactionMode}, + ${row.threadKind ?? "primary"}, + ${row.parentThreadId ?? null}, ${row.branch}, ${row.worktreePath}, ${row.latestTurnId}, @@ -54,6 +58,8 @@ const makeProjectionThreadRepository = Effect.gen(function* () { model = excluded.model, runtime_mode = excluded.runtime_mode, interaction_mode = excluded.interaction_mode, + thread_kind = excluded.thread_kind, + parent_thread_id = excluded.parent_thread_id, branch = excluded.branch, worktree_path = excluded.worktree_path, latest_turn_id = excluded.latest_turn_id, @@ -75,6 +81,8 @@ const makeProjectionThreadRepository = Effect.gen(function* () { model, runtime_mode AS "runtimeMode", interaction_mode AS "interactionMode", + thread_kind AS "threadKind", + parent_thread_id AS "parentThreadId", branch, worktree_path AS "worktreePath", latest_turn_id AS "latestTurnId", @@ -98,6 +106,8 @@ const makeProjectionThreadRepository = Effect.gen(function* () { model, runtime_mode AS "runtimeMode", interaction_mode AS "interactionMode", + thread_kind AS "threadKind", + parent_thread_id AS "parentThreadId", branch, worktree_path AS "worktreePath", latest_turn_id AS "latestTurnId", diff --git a/apps/server/src/persistence/Migrations.ts b/apps/server/src/persistence/Migrations.ts index 7deb890dd8..2327061740 100644 --- a/apps/server/src/persistence/Migrations.ts +++ b/apps/server/src/persistence/Migrations.ts @@ -25,6 +25,7 @@ import Migration0010 from "./Migrations/010_ProjectionThreadsRuntimeMode.ts"; import Migration0011 from "./Migrations/011_OrchestrationThreadCreatedRuntimeMode.ts"; import Migration0012 from "./Migrations/012_ProjectionThreadsInteractionMode.ts"; import Migration0013 from "./Migrations/013_ProjectionThreadProposedPlans.ts"; +import Migration0014 from "./Migrations/014_ProjectionThreadSubagentRuns.ts"; import { Effect } from "effect"; /** @@ -51,6 +52,7 @@ const loader = Migrator.fromRecord({ "11_OrchestrationThreadCreatedRuntimeMode": Migration0011, "12_ProjectionThreadsInteractionMode": Migration0012, "13_ProjectionThreadProposedPlans": Migration0013, + "14_ProjectionThreadSubagentRuns": Migration0014, }); /** diff --git a/apps/server/src/persistence/Migrations/014_ProjectionThreadSubagentRuns.ts b/apps/server/src/persistence/Migrations/014_ProjectionThreadSubagentRuns.ts new file mode 100644 index 0000000000..dca917da7a --- /dev/null +++ b/apps/server/src/persistence/Migrations/014_ProjectionThreadSubagentRuns.ts @@ -0,0 +1,41 @@ +import * as Effect from "effect/Effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; + +export default Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + yield* sql` + ALTER TABLE projection_threads + ADD COLUMN thread_kind TEXT NOT NULL DEFAULT 'primary' + `.pipe(Effect.catchTag("SqlError", () => Effect.void)); + + yield* sql` + ALTER TABLE projection_threads + ADD COLUMN parent_thread_id TEXT + `.pipe(Effect.catchTag("SqlError", () => Effect.void)); + + yield* sql` + CREATE TABLE IF NOT EXISTS projection_thread_subagent_runs ( + run_id TEXT PRIMARY KEY, + parent_thread_id TEXT NOT NULL, + subagent_thread_id TEXT, + skill_id TEXT NOT NULL, + skill_title TEXT NOT NULL, + task TEXT NOT NULL, + status TEXT NOT NULL, + branch TEXT, + worktree_path TEXT, + report_json TEXT, + last_error TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + completed_at TEXT, + accepted_at TEXT + ) + `; + + yield* sql` + CREATE INDEX IF NOT EXISTS idx_projection_thread_subagent_runs_parent_thread + ON projection_thread_subagent_runs(parent_thread_id, created_at) + `; +}); diff --git a/apps/server/src/persistence/Services/ProjectionThreadSubagentRuns.ts b/apps/server/src/persistence/Services/ProjectionThreadSubagentRuns.ts new file mode 100644 index 0000000000..af26cabf03 --- /dev/null +++ b/apps/server/src/persistence/Services/ProjectionThreadSubagentRuns.ts @@ -0,0 +1,51 @@ +import { + IsoDateTime, + SubagentReport, + SubagentRunId, + SubagentRunStatus, + ThreadId, + TrimmedNonEmptyString, +} from "@t3tools/contracts"; +import { Schema, ServiceMap } from "effect"; +import type { Effect } from "effect"; + +import type { ProjectionRepositoryError } from "../Errors.ts"; + +export const ProjectionThreadSubagentRun = Schema.Struct({ + runId: SubagentRunId, + parentThreadId: ThreadId, + subagentThreadId: Schema.NullOr(ThreadId), + skillId: TrimmedNonEmptyString, + skillTitle: TrimmedNonEmptyString, + task: TrimmedNonEmptyString, + status: SubagentRunStatus, + branch: Schema.NullOr(TrimmedNonEmptyString), + worktreePath: Schema.NullOr(TrimmedNonEmptyString), + report: Schema.NullOr(SubagentReport), + lastError: Schema.NullOr(TrimmedNonEmptyString), + createdAt: IsoDateTime, + updatedAt: IsoDateTime, + completedAt: Schema.NullOr(IsoDateTime), + acceptedAt: Schema.NullOr(IsoDateTime), +}); +export type ProjectionThreadSubagentRun = typeof ProjectionThreadSubagentRun.Type; + +export const ListProjectionThreadSubagentRunsInput = Schema.Struct({ + parentThreadId: ThreadId, +}); +export type ListProjectionThreadSubagentRunsInput = + typeof ListProjectionThreadSubagentRunsInput.Type; + +export interface ProjectionThreadSubagentRunRepositoryShape { + readonly upsert: ( + run: ProjectionThreadSubagentRun, + ) => Effect.Effect; + readonly listByParentThreadId: ( + input: ListProjectionThreadSubagentRunsInput, + ) => Effect.Effect, ProjectionRepositoryError>; +} + +export class ProjectionThreadSubagentRunRepository extends ServiceMap.Service< + ProjectionThreadSubagentRunRepository, + ProjectionThreadSubagentRunRepositoryShape +>()("t3/persistence/Services/ProjectionThreadSubagentRuns/ProjectionThreadSubagentRunRepository") {} diff --git a/apps/server/src/persistence/Services/ProjectionThreads.ts b/apps/server/src/persistence/Services/ProjectionThreads.ts index 7a30870f2d..0fc96751cb 100644 --- a/apps/server/src/persistence/Services/ProjectionThreads.ts +++ b/apps/server/src/persistence/Services/ProjectionThreads.ts @@ -8,6 +8,7 @@ */ import { IsoDateTime, + OrchestrationThreadKind, ProjectId, ProviderInteractionMode, RuntimeMode, @@ -26,6 +27,12 @@ export const ProjectionThread = Schema.Struct({ model: Schema.String, runtimeMode: RuntimeMode, interactionMode: ProviderInteractionMode, + threadKind: Schema.optional( + OrchestrationThreadKind.pipe(Schema.withDecodingDefault(() => "primary")), + ), + parentThreadId: Schema.optional( + Schema.NullOr(ThreadId).pipe(Schema.withDecodingDefault(() => null)), + ), branch: Schema.NullOr(Schema.String), worktreePath: Schema.NullOr(Schema.String), latestTurnId: Schema.NullOr(TurnId), diff --git a/apps/server/src/provider/Layers/CodexAdapter.ts b/apps/server/src/provider/Layers/CodexAdapter.ts index 1e4b80ae9c..64e4550358 100644 --- a/apps/server/src/provider/Layers/CodexAdapter.ts +++ b/apps/server/src/provider/Layers/CodexAdapter.ts @@ -1370,6 +1370,9 @@ const makeCodexAdapter = (options?: CodexAdapterLiveOptions) => ...(input.interactionMode !== undefined ? { interactionMode: input.interactionMode } : {}), + ...(input.developerInstructions !== undefined + ? { developerInstructions: input.developerInstructions } + : {}), ...(codexAttachments.length > 0 ? { attachments: codexAttachments } : {}), }; return manager.sendTurn(managerInput); diff --git a/apps/server/src/serverLayers.ts b/apps/server/src/serverLayers.ts index ff9b10d96f..25a16972a9 100644 --- a/apps/server/src/serverLayers.ts +++ b/apps/server/src/serverLayers.ts @@ -36,6 +36,8 @@ import { GitServiceLive } from "./git/Layers/GitService"; import { BunPtyAdapterLive } from "./terminal/Layers/BunPTY"; import { NodePtyAdapterLive } from "./terminal/Layers/NodePTY"; import { AnalyticsService } from "./telemetry/Services/AnalyticsService"; +import { SkillCatalogLive } from "./subagents/Layers/SkillCatalog.ts"; +import { SubagentCoordinatorLive } from "./subagents/Layers/SubagentCoordinator.ts"; export function makeServerProviderLayer(): Layer.Layer< ProviderService, @@ -71,6 +73,7 @@ export function makeServerProviderLayer(): Layer.Layer< export function makeServerRuntimeServicesLayer() { const gitCoreLayer = GitCoreLive.pipe(Layer.provideMerge(GitServiceLive)); const textGenerationLayer = CodexTextGenerationLive; + const skillCatalogLayer = SkillCatalogLive; const orchestrationLayer = OrchestrationEngineLive.pipe( Layer.provide(OrchestrationProjectionPipelineLive), @@ -101,10 +104,16 @@ export function makeServerRuntimeServicesLayer() { const checkpointReactorLayer = CheckpointReactorLive.pipe( Layer.provideMerge(runtimeServicesLayer), ); + const subagentCoordinatorLayer = SubagentCoordinatorLive.pipe( + Layer.provideMerge(runtimeServicesLayer), + Layer.provideMerge(gitCoreLayer), + Layer.provideMerge(skillCatalogLayer), + ); const orchestrationReactorLayer = OrchestrationReactorLive.pipe( Layer.provideMerge(runtimeIngestionLayer), Layer.provideMerge(providerCommandReactorLayer), Layer.provideMerge(checkpointReactorLayer), + Layer.provideMerge(subagentCoordinatorLayer), ); const terminalLayer = TerminalManagerLive.pipe( @@ -126,6 +135,7 @@ export function makeServerRuntimeServicesLayer() { gitCoreLayer, gitManagerLayer, terminalLayer, + skillCatalogLayer, KeybindingsLive, ).pipe(Layer.provideMerge(NodeServices.layer)); } diff --git a/apps/server/src/subagents/Layers/SkillCatalog.ts b/apps/server/src/subagents/Layers/SkillCatalog.ts new file mode 100644 index 0000000000..d30fbaa007 --- /dev/null +++ b/apps/server/src/subagents/Layers/SkillCatalog.ts @@ -0,0 +1,162 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { Effect, Layer, Option } from "effect"; +import { type SubagentSkill } from "@t3tools/contracts"; +import { SkillCatalog, type SkillCatalogShape } from "../Services/SkillCatalog.ts"; + +function normalizeRelativePath(value: string): string { + return value.replaceAll("\\", "/").replace(/^\/+|\/+$/g, ""); +} + +function humanizeSkillName(value: string): string { + return value + .split(/[-_/]+/g) + .filter((part) => part.length > 0) + .map((part) => part[0]?.toUpperCase() + part.slice(1)) + .join(" "); +} + +function extractSkillTitle(markdown: string, fallbackId: string): string { + for (const line of markdown.split(/\r?\n/g)) { + const match = /^#\s+(.+)$/.exec(line.trim()); + if (!match?.[1]) { + continue; + } + const title = match[1].trim(); + if (title.length > 0) { + return title; + } + } + return humanizeSkillName(fallbackId); +} + +function extractSkillSummary(markdown: string): string | undefined { + const lines = markdown.split(/\r?\n/g); + const paragraph: string[] = []; + for (const rawLine of lines) { + const line = rawLine.trim(); + if (line.length === 0) { + if (paragraph.length > 0) { + break; + } + continue; + } + if (line.startsWith("#")) { + continue; + } + if (line.startsWith("```")) { + break; + } + paragraph.push(line); + } + const summary = paragraph.join(" ").trim(); + return summary.length > 0 ? summary : undefined; +} + +const makeSkillCatalog = Effect.sync(() => { + const resolveSkillRoot = (): string => { + const codexHome = process.env.CODEX_HOME?.trim(); + if (codexHome) { + if (codexHome === "~") { + return path.join(os.homedir(), "skills"); + } + if (codexHome.startsWith("~/") || codexHome.startsWith("~\\")) { + return path.join(os.homedir(), codexHome.slice(2), "skills"); + } + return path.join(codexHome, "skills"); + } + return path.join(os.homedir(), ".codex", "skills"); + }; + + const readSkillDirectory = async ( + rootDir: string, + directory: string, + ): Promise> => { + const skillPath = path.join(directory, "SKILL.md"); + try { + const skillFileStat = await fs.stat(skillPath); + if (skillFileStat.isFile()) { + const promptMarkdown = (await fs.readFile(skillPath, "utf8")).trim(); + if (promptMarkdown.length === 0) { + return []; + } + const relativeDirectory = normalizeRelativePath(path.relative(rootDir, directory)); + const segments = relativeDirectory.split("/").filter((segment) => segment.length > 0); + const idSegments = segments[0] === ".system" ? segments.slice(1) : segments; + const skillId = idSegments.join("/"); + if (skillId.length === 0) { + return []; + } + const summary = extractSkillSummary(promptMarkdown); + return [ + { + id: skillId, + title: extractSkillTitle(promptMarkdown, skillId), + path: skillPath, + promptMarkdown, + ...(summary ? { summary } : {}), + } satisfies SubagentSkill, + ]; + } + } catch { + // Not a skill directory; recurse into children below. + } + + let entries: ReadonlyArray = []; + try { + entries = await fs.readdir(directory); + } catch { + return []; + } + + const nested = await Promise.all( + entries.map(async (entryName): Promise> => { + const entryPath = path.join(directory, entryName); + try { + const stat = await fs.stat(entryPath); + if (!stat.isDirectory()) { + return []; + } + return await readSkillDirectory(rootDir, entryPath); + } catch { + return []; + } + }), + ); + return nested.flat(); + }; + + const listSkills: SkillCatalogShape["listSkills"] = () => + Effect.promise(async () => { + const rootDir = resolveSkillRoot(); + try { + const rootStat = await fs.stat(rootDir); + if (!rootStat.isDirectory()) { + return [] as ReadonlyArray; + } + } catch { + return [] as ReadonlyArray; + } + const skills = await readSkillDirectory(rootDir, rootDir); + return skills.toSorted((left: SubagentSkill, right: SubagentSkill) => + left.id.localeCompare(right.id), + ); + }); + + const getSkillById: SkillCatalogShape["getSkillById"] = (skillId) => + listSkills().pipe( + Effect.map((skills) => { + const match = skills.find((skill) => skill.id === skillId); + return match ? Option.some(match) : Option.none(); + }), + ); + + return { + listSkills, + getSkillById, + } satisfies SkillCatalogShape; +}); + +export const SkillCatalogLive = Layer.effect(SkillCatalog, makeSkillCatalog); diff --git a/apps/server/src/subagents/Layers/SubagentCoordinator.test.ts b/apps/server/src/subagents/Layers/SubagentCoordinator.test.ts new file mode 100644 index 0000000000..3a9d2c7420 --- /dev/null +++ b/apps/server/src/subagents/Layers/SubagentCoordinator.test.ts @@ -0,0 +1,287 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; + +import { + CommandId, + ProjectId, + ThreadId, + type ProviderRuntimeEvent, + type ProviderSession, + type SubagentRun, +} from "@t3tools/contracts"; +import { Effect, Exit, Layer, ManagedRuntime, PubSub, Scope, Stream } from "effect"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import * as NodeServices from "@effect/platform-node/NodeServices"; + +import { ServerConfig } from "../../config.ts"; +import { GitCore, type GitCoreShape } from "../../git/Services/GitCore.ts"; +import { OrchestrationCommandReceiptRepositoryLive } from "../../persistence/Layers/OrchestrationCommandReceipts.ts"; +import { OrchestrationEventStoreLive } from "../../persistence/Layers/OrchestrationEventStore.ts"; +import { SqlitePersistenceMemory } from "../../persistence/Layers/Sqlite.ts"; +import { OrchestrationEngineLive } from "../../orchestration/Layers/OrchestrationEngine.ts"; +import { OrchestrationProjectionPipelineLive } from "../../orchestration/Layers/ProjectionPipeline.ts"; +import { RuntimeReceiptBus } from "../../orchestration/Services/RuntimeReceiptBus.ts"; +import { OrchestrationEngineService } from "../../orchestration/Services/OrchestrationEngine.ts"; +import { + ProviderService, + type ProviderServiceShape, +} from "../../provider/Services/ProviderService.ts"; +import { SkillCatalog, type SkillCatalogShape } from "../Services/SkillCatalog.ts"; +import { SubagentCoordinator } from "../Services/SubagentCoordinator.ts"; +import { SubagentCoordinatorLive } from "./SubagentCoordinator.ts"; + +const asProjectId = (value: string): ProjectId => ProjectId.makeUnsafe(value); +const asThreadId = (value: string): ThreadId => ThreadId.makeUnsafe(value); + +async function waitFor( + predicate: () => boolean | Promise, + timeoutMs = 2_000, +): Promise { + const deadline = Date.now() + timeoutMs; + while (!(await predicate())) { + if (Date.now() >= deadline) { + throw new Error("Timed out waiting for expectation."); + } + await new Promise((resolve) => setTimeout(resolve, 10)); + } +} + +describe("SubagentCoordinator", () => { + let runtime: ManagedRuntime.ManagedRuntime< + OrchestrationEngineService | SubagentCoordinator, + unknown + > | null = null; + let scope: Scope.Closeable | null = null; + const createdStateDirs = new Set(); + + afterEach(async () => { + if (scope) { + await Effect.runPromise(Scope.close(scope, Exit.void)); + } + scope = null; + if (runtime) { + await runtime.dispose(); + } + runtime = null; + for (const stateDir of createdStateDirs) { + fs.rmSync(stateDir, { recursive: true, force: true }); + } + createdStateDirs.clear(); + }); + + it("detaches visible worktree threads when a retained subagent run is cleaned up", async () => { + const runtimeEventPubSub = Effect.runSync(PubSub.unbounded()); + const receiptPubSub = Effect.runSync(PubSub.unbounded()); + const stopSession = vi.fn(() => Effect.void); + const removeWorktree = vi.fn(() => Effect.void); + const deleteLocalBranch = vi.fn(() => Effect.void); + + const providerService: ProviderServiceShape = { + startSession: () => Effect.die("unused"), + sendTurn: () => Effect.die("unused"), + interruptTurn: () => Effect.die("unused"), + respondToRequest: () => Effect.die("unused"), + respondToUserInput: () => Effect.die("unused"), + stopSession, + listSessions: () => Effect.succeed([] satisfies ReadonlyArray), + getCapabilities: () => + Effect.succeed({ + sessionModelSwitch: "in-session", + }), + rollbackConversation: () => Effect.die("unused"), + streamEvents: Stream.fromPubSub(runtimeEventPubSub), + }; + + const skillCatalog: SkillCatalogShape = { + listSkills: () => Effect.succeed([]), + getSkillById: () => Effect.die("unused"), + }; + + const git: Partial = { + removeWorktree, + deleteLocalBranch, + }; + const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "t3code-subagent-")); + createdStateDirs.add(stateDir); + + const orchestrationLayer = OrchestrationEngineLive.pipe( + Layer.provide(OrchestrationProjectionPipelineLive), + Layer.provide(OrchestrationEventStoreLive), + Layer.provide(OrchestrationCommandReceiptRepositoryLive), + Layer.provide(SqlitePersistenceMemory), + ); + + const managedRuntime = ManagedRuntime.make( + SubagentCoordinatorLive.pipe( + Layer.provideMerge(orchestrationLayer), + Layer.provideMerge(Layer.succeed(ProviderService, providerService)), + Layer.provideMerge(Layer.succeed(GitCore, git as GitCoreShape)), + Layer.provideMerge(Layer.succeed(SkillCatalog, skillCatalog)), + Layer.provideMerge( + Layer.succeed(RuntimeReceiptBus, { + publish: () => Effect.void, + stream: Stream.fromPubSub(receiptPubSub), + }), + ), + Layer.provideMerge(ServerConfig.layerTest(process.cwd(), stateDir)), + Layer.provideMerge(NodeServices.layer), + ), + ); + runtime = managedRuntime; + + const engine = await managedRuntime.runPromise(Effect.service(OrchestrationEngineService)); + const coordinator = await managedRuntime.runPromise(Effect.service(SubagentCoordinator)); + scope = await Effect.runPromise(Scope.make("sequential")); + await Effect.runPromise(coordinator.start.pipe(Scope.provide(scope))); + await new Promise((resolve) => setTimeout(resolve, 0)); + + const createdAt = "2026-03-15T23:00:00.000Z"; + const projectId = asProjectId("project-1"); + const parentThreadId = asThreadId("thread-parent"); + const hiddenThreadId = asThreadId("thread-subagent"); + const visibleThreadId = asThreadId("thread-visible"); + const run: SubagentRun = { + id: "run-1", + parentThreadId, + subagentThreadId: hiddenThreadId, + skillId: "frontend-design", + skillTitle: "Frontend Design", + task: "Rewrite the homepage hero copy and update the UI.", + status: "retained", + branch: "t3code/subagent-frontend-design-run-1", + worktreePath: "/tmp/subagent-worktree", + report: null, + lastError: null, + createdAt, + updatedAt: createdAt, + completedAt: createdAt, + acceptedAt: createdAt, + }; + + await managedRuntime.runPromise( + engine.dispatch({ + type: "project.create", + commandId: CommandId.makeUnsafe("cmd-project-create"), + projectId, + title: "Project", + workspaceRoot: "/tmp/project-root", + defaultModel: "gpt-5.4", + createdAt, + }), + ); + await managedRuntime.runPromise( + engine.dispatch({ + type: "thread.create", + commandId: CommandId.makeUnsafe("cmd-thread-parent"), + threadId: parentThreadId, + projectId, + title: "Parent thread", + model: "gpt-5.4", + runtimeMode: "full-access", + interactionMode: "default", + branch: "main", + worktreePath: null, + createdAt, + }), + ); + await managedRuntime.runPromise( + engine.dispatch({ + type: "thread.create", + commandId: CommandId.makeUnsafe("cmd-thread-hidden"), + threadId: hiddenThreadId, + projectId, + title: "Hidden subagent thread", + model: "gpt-5.4", + runtimeMode: "full-access", + interactionMode: "default", + threadKind: "subagent", + parentThreadId, + branch: run.branch, + worktreePath: run.worktreePath, + createdAt, + }), + ); + await managedRuntime.runPromise( + engine.dispatch({ + type: "thread.create", + commandId: CommandId.makeUnsafe("cmd-thread-visible"), + threadId: visibleThreadId, + projectId, + title: "Visible worktree thread", + model: "gpt-5.4", + runtimeMode: "full-access", + interactionMode: "default", + branch: run.branch, + worktreePath: run.worktreePath, + createdAt, + }), + ); + await managedRuntime.runPromise( + engine.dispatch({ + type: "thread.subagent.upsert", + commandId: CommandId.makeUnsafe("cmd-run-upsert"), + threadId: parentThreadId, + subagentRun: run, + createdAt, + }), + ); + + await managedRuntime.runPromise( + engine.dispatch({ + type: "thread.subagent.cleanup", + commandId: CommandId.makeUnsafe("cmd-run-cleanup"), + threadId: parentThreadId, + runId: run.id, + createdAt, + }), + ); + + await waitFor(async () => { + const readModel = await managedRuntime.runPromise(engine.getReadModel()); + const parentThread = readModel.threads.find((thread) => thread.id === parentThreadId); + const visibleThread = readModel.threads.find((thread) => thread.id === visibleThreadId); + const hiddenThread = readModel.threads.find((thread) => thread.id === hiddenThreadId); + const cleanedRun = parentThread?.subagentRuns?.find((candidate) => candidate.id === run.id); + return ( + visibleThread?.branch === null && + visibleThread.worktreePath === null && + hiddenThread?.deletedAt !== null && + hiddenThread?.deletedAt !== undefined && + cleanedRun?.status === "cleaned_up" && + cleanedRun.branch === null && + cleanedRun.worktreePath === null + ); + }); + + const readModel = await managedRuntime.runPromise(engine.getReadModel()); + const visibleThread = readModel.threads.find((thread) => thread.id === visibleThreadId); + const parentThread = readModel.threads.find((thread) => thread.id === parentThreadId); + const hiddenThread = readModel.threads.find((thread) => thread.id === hiddenThreadId); + const cleanedRun = parentThread?.subagentRuns?.find((candidate) => candidate.id === run.id); + + expect(visibleThread).toMatchObject({ + id: visibleThreadId, + branch: null, + worktreePath: null, + }); + expect(hiddenThread?.deletedAt).toEqual(expect.any(String)); + expect(cleanedRun).toMatchObject({ + id: run.id, + status: "cleaned_up", + branch: null, + worktreePath: null, + }); + expect(stopSession).toHaveBeenCalledWith({ threadId: hiddenThreadId }); + expect(removeWorktree).toHaveBeenCalledWith({ + cwd: "/tmp/project-root", + path: run.worktreePath, + force: true, + }); + expect(deleteLocalBranch).toHaveBeenCalledWith({ + cwd: "/tmp/project-root", + branch: run.branch, + force: true, + }); + }); +}); diff --git a/apps/server/src/subagents/Layers/SubagentCoordinator.ts b/apps/server/src/subagents/Layers/SubagentCoordinator.ts new file mode 100644 index 0000000000..976a829612 --- /dev/null +++ b/apps/server/src/subagents/Layers/SubagentCoordinator.ts @@ -0,0 +1,584 @@ +import { + CommandId, + MessageId, + ThreadId, + type GitStatusResult, + type OrchestrationEvent, + type OrchestrationReadModel, + type OrchestrationThread, + type SubagentReport, + type SubagentRun, + type SubagentSkill, +} from "@t3tools/contracts"; +import { Cause, Effect, Layer, Option, Stream } from "effect"; +import { makeDrainableWorker } from "@t3tools/shared/DrainableWorker"; + +import { resolveThreadWorkspaceCwd } from "../../checkpointing/Utils.ts"; +import { GitCore } from "../../git/Services/GitCore.ts"; +import { OrchestrationEngineService } from "../../orchestration/Services/OrchestrationEngine.ts"; +import { + RuntimeReceiptBus, + type TurnProcessingQuiescedReceipt, +} from "../../orchestration/Services/RuntimeReceiptBus.ts"; +import { ProviderService } from "../../provider/Services/ProviderService.ts"; +import { SkillCatalog } from "../Services/SkillCatalog.ts"; +import { + SubagentCoordinator, + type SubagentCoordinatorShape, +} from "../Services/SubagentCoordinator.ts"; + +type SubagentDomainEvent = Extract< + OrchestrationEvent, + { + type: + | "thread.subagent-start-requested" + | "thread.subagent-report-accepted" + | "thread.subagent-cleanup-requested"; + } +>; + +type SubagentCoordinatorInput = + | { + readonly kind: "event"; + readonly event: SubagentDomainEvent; + } + | { + readonly kind: "receipt"; + readonly receipt: TurnProcessingQuiescedReceipt; + }; + +const serverCommandId = (tag: string): CommandId => + CommandId.makeUnsafe(`server:subagent:${tag}:${crypto.randomUUID()}`); + +function sanitizeBranchFragment(value: string): string { + return value + .trim() + .toLowerCase() + .replace(/[^a-z0-9/_-]+/g, "-") + .replace(/\/+/g, "/") + .replace(/-+/g, "-") + .replace(/^[./_-]+|[./_-]+$/g, "") + .slice(0, 48); +} + +function buildSubagentBranchName(skillId: string, runId: string): string { + const skillFragment = sanitizeBranchFragment(skillId) || "skill"; + const runFragment = sanitizeBranchFragment(runId).slice(-8) || crypto.randomUUID().slice(0, 8); + return `t3code/subagent-${skillFragment}-${runFragment}`; +} + +function buildSubagentThreadTitle(skill: SubagentSkill, task: string): string { + const suffix = task.trim().slice(0, 64); + return suffix.length > 0 ? `${skill.title}: ${suffix}` : skill.title; +} + +function buildSubagentDeveloperInstructions(skill: SubagentSkill): string { + return [ + "You are a specialist sub-agent running in an isolated git worktree.", + "Work only inside this worktree.", + "When you finish, your final assistant message must use this exact markdown structure:", + "## Summary", + "## Findings", + "- ...", + "## Actions Taken", + "- ...", + "## Recommended Actions", + "- ...", + "If a section has nothing to report, write `None.` under it.", + "", + `# Skill: ${skill.title}`, + "", + skill.promptMarkdown, + ].join("\n"); +} + +function extractSection(markdown: string, heading: string): string | null { + const pattern = new RegExp(`^##\\s+${heading}\\s*$([\\s\\S]*?)(?=^##\\s+|$)`, "im"); + const match = pattern.exec(markdown); + const value = match?.[1]?.trim(); + return value && value.length > 0 ? value : null; +} + +function extractBulletLines(markdown: string | null): string[] { + if (!markdown) { + return []; + } + const bullets = markdown + .split(/\r?\n/g) + .map((line) => line.trim()) + .filter((line) => /^[-*]\s+/.test(line)) + .map((line) => line.replace(/^[-*]\s+/, "").trim()) + .filter((line) => line.length > 0 && line.toLowerCase() !== "none."); + return bullets; +} + +function extractSummary(markdown: string): string | null { + const explicit = extractSection(markdown, "Summary"); + if (explicit) { + const singleLine = explicit + .split(/\r?\n/g) + .find((line) => line.trim().length > 0) + ?.trim(); + if (singleLine && singleLine.toLowerCase() !== "none.") { + return singleLine; + } + } + const firstParagraph = markdown + .split(/\r?\n\r?\n/g) + .map((paragraph) => paragraph.trim()) + .find((paragraph) => paragraph.length > 0 && !paragraph.startsWith("#")); + return firstParagraph ?? null; +} + +function synthesizeReport( + markdown: string, + filesChanged: ReadonlyArray, +): SubagentReport | null { + const normalizedMarkdown = markdown.trim(); + const summary = extractSummary(normalizedMarkdown); + if (!summary) { + return null; + } + return { + summary, + markdown: normalizedMarkdown, + findings: extractBulletLines(extractSection(normalizedMarkdown, "Findings")), + actionsTaken: extractBulletLines(extractSection(normalizedMarkdown, "Actions Taken")), + recommendedActions: extractBulletLines( + extractSection(normalizedMarkdown, "Recommended Actions"), + ), + filesChanged: [...filesChanged], + generatedAt: new Date().toISOString(), + }; +} + +function findProjectRoot( + readModel: OrchestrationReadModel, + thread: OrchestrationThread, +): string | null { + return ( + readModel.projects.find((project) => project.id === thread.projectId)?.workspaceRoot ?? null + ); +} + +function findRun(thread: OrchestrationThread, runId: string): SubagentRun | null { + return thread.subagentRuns?.find((run) => run.id === runId) ?? null; +} + +const EMPTY_GIT_STATUS: GitStatusResult = { + branch: null, + hasWorkingTreeChanges: false, + workingTree: { files: [], insertions: 0, deletions: 0 }, + hasUpstream: false, + aheadCount: 0, + behindCount: 0, + pr: null, +}; + +const makeSubagentCoordinator = Effect.gen(function* () { + const orchestrationEngine = yield* OrchestrationEngineService; + const git = yield* GitCore; + const providerService = yield* ProviderService; + const skillCatalog = yield* SkillCatalog; + const runtimeReceiptBus = yield* RuntimeReceiptBus; + + const upsertRun = (parentThreadId: ThreadId, run: SubagentRun, createdAt = run.updatedAt) => + orchestrationEngine.dispatch({ + type: "thread.subagent.upsert", + commandId: serverCommandId("upsert"), + threadId: parentThreadId, + subagentRun: run, + createdAt, + }); + + const detachVisibleThreadsFromRun = (run: SubagentRun) => + Effect.gen(function* () { + if (!run.worktreePath && !run.branch) { + return; + } + const readModel = yield* orchestrationEngine.getReadModel(); + const impactedThreads = readModel.threads.filter( + (thread) => + thread.id !== run.subagentThreadId && + ((run.worktreePath !== null && thread.worktreePath === run.worktreePath) || + (run.branch !== null && thread.branch === run.branch)), + ); + for (const thread of impactedThreads) { + yield* orchestrationEngine + .dispatch({ + type: "thread.meta.update", + commandId: serverCommandId("detach-cleaned-worktree"), + threadId: thread.id, + branch: null, + worktreePath: null, + }) + .pipe(Effect.catch(() => Effect.void)); + } + }); + + const cleanupResources = (input: { + readonly projectRoot: string; + readonly run: SubagentRun; + readonly force: boolean; + }) => + Effect.gen(function* () { + if (input.run.subagentThreadId) { + yield* providerService + .stopSession({ threadId: input.run.subagentThreadId }) + .pipe(Effect.catch(() => Effect.void)); + } + if (input.run.worktreePath) { + yield* git.removeWorktree({ + cwd: input.projectRoot, + path: input.run.worktreePath, + force: input.force, + }); + } + if (input.run.branch) { + yield* git.deleteLocalBranch({ + cwd: input.projectRoot, + branch: input.run.branch, + force: input.force, + }); + } + yield* detachVisibleThreadsFromRun(input.run); + if (input.run.subagentThreadId) { + yield* orchestrationEngine + .dispatch({ + type: "thread.delete", + commandId: serverCommandId("delete-thread"), + threadId: input.run.subagentThreadId, + }) + .pipe(Effect.catch(() => Effect.void)); + } + }); + + const processStartRequested = ( + event: Extract, + ) => + Effect.gen(function* () { + const readModel = yield* orchestrationEngine.getReadModel(); + const parentThread = readModel.threads.find((thread) => thread.id === event.payload.threadId); + const existingRun = parentThread ? findRun(parentThread, event.payload.runId) : null; + if (!parentThread || !existingRun) { + return; + } + + const skillOption = yield* skillCatalog.getSkillById(event.payload.skillId); + if (Option.isNone(skillOption)) { + yield* upsertRun(parentThread.id, { + ...existingRun, + status: "failed", + lastError: `Unknown skill '${event.payload.skillId}'.`, + updatedAt: new Date().toISOString(), + completedAt: new Date().toISOString(), + }); + return; + } + + const skill = skillOption.value; + const projectRoot = findProjectRoot(readModel, parentThread); + const workspaceCwd = resolveThreadWorkspaceCwd({ + thread: parentThread, + projects: readModel.projects, + }); + if (!projectRoot || !workspaceCwd) { + yield* upsertRun(parentThread.id, { + ...existingRun, + skillTitle: skill.title, + status: "failed", + lastError: "Could not resolve the parent thread workspace.", + updatedAt: new Date().toISOString(), + completedAt: new Date().toISOString(), + }); + return; + } + + let createdThreadId: ThreadId | null = null; + let nextRun: SubagentRun = { + ...existingRun, + skillTitle: skill.title, + }; + + try { + const baseBranch = parentThread.branch ?? (yield* git.status({ cwd: projectRoot })).branch; + if (!baseBranch) { + throw new Error("Could not resolve a git base branch for the sub-agent worktree."); + } + + const branchName = buildSubagentBranchName(skill.id, existingRun.id); + const worktree = yield* git.createWorktree({ + cwd: projectRoot, + branch: baseBranch, + newBranch: branchName, + path: null, + }); + + createdThreadId = ThreadId.makeUnsafe(crypto.randomUUID()); + nextRun = { + ...nextRun, + subagentThreadId: createdThreadId, + branch: worktree.worktree.branch, + worktreePath: worktree.worktree.path, + status: "running", + updatedAt: new Date().toISOString(), + }; + yield* orchestrationEngine.dispatch({ + type: "thread.create", + commandId: serverCommandId("create-thread"), + threadId: createdThreadId, + projectId: parentThread.projectId, + title: buildSubagentThreadTitle(skill, event.payload.task), + model: parentThread.model, + runtimeMode: parentThread.runtimeMode, + interactionMode: "default", + threadKind: "subagent", + parentThreadId: parentThread.id, + branch: worktree.worktree.branch, + worktreePath: worktree.worktree.path, + createdAt: event.payload.createdAt, + }); + yield* upsertRun(parentThread.id, nextRun); + yield* orchestrationEngine.dispatch({ + type: "thread.turn.start", + commandId: serverCommandId("turn-start"), + threadId: createdThreadId, + message: { + messageId: MessageId.makeUnsafe(crypto.randomUUID()), + role: "user", + text: event.payload.task, + attachments: [], + }, + model: parentThread.model, + runtimeMode: parentThread.runtimeMode, + interactionMode: "default", + developerInstructions: buildSubagentDeveloperInstructions(skill), + createdAt: event.payload.createdAt, + }); + } catch (cause) { + if (nextRun.worktreePath || nextRun.branch) { + yield* cleanupResources({ + projectRoot, + run: nextRun, + force: true, + }).pipe(Effect.catch(() => Effect.void)); + } + yield* upsertRun(parentThread.id, { + ...nextRun, + status: "failed", + branch: null, + worktreePath: null, + subagentThreadId: createdThreadId, + lastError: cause instanceof Error ? cause.message : String(cause), + updatedAt: new Date().toISOString(), + completedAt: new Date().toISOString(), + }); + } + }); + + const processQuiescedReceipt = (receipt: TurnProcessingQuiescedReceipt) => + Effect.gen(function* () { + const readModel = yield* orchestrationEngine.getReadModel(); + const hiddenThread = readModel.threads.find( + (thread) => + thread.id === receipt.threadId && + thread.threadKind === "subagent" && + typeof thread.parentThreadId === "string", + ); + if (!hiddenThread || !hiddenThread.parentThreadId) { + return; + } + const parentThread = readModel.threads.find( + (thread) => thread.id === hiddenThread.parentThreadId, + ); + if (!parentThread) { + return; + } + const run = + parentThread.subagentRuns?.find( + (candidate) => + candidate.subagentThreadId === hiddenThread.id && + (candidate.status === "preparing" || candidate.status === "running"), + ) ?? null; + if (!run) { + return; + } + + const assistantMessage = + hiddenThread.messages.findLast( + (message) => message.role === "assistant" && message.turnId === receipt.turnId, + ) ?? + hiddenThread.messages.findLast((message) => message.role === "assistant") ?? + null; + const changedFiles = + hiddenThread.checkpoints + .find((checkpoint) => checkpoint.turnId === receipt.turnId) + ?.files.map((file) => file.path) ?? []; + const report = assistantMessage?.text + ? synthesizeReport(assistantMessage.text, changedFiles) + : null; + + yield* providerService + .stopSession({ threadId: hiddenThread.id }) + .pipe(Effect.catch(() => Effect.void)); + yield* upsertRun(parentThread.id, { + ...run, + status: report ? "report_ready" : "failed", + report, + lastError: report ? null : "The sub-agent finished without a structured final report.", + updatedAt: new Date().toISOString(), + completedAt: new Date().toISOString(), + }); + }); + + const processReportAccepted = ( + event: Extract, + ) => + Effect.gen(function* () { + const readModel = yield* orchestrationEngine.getReadModel(); + const parentThread = readModel.threads.find((thread) => thread.id === event.payload.threadId); + const run = parentThread ? findRun(parentThread, event.payload.runId) : null; + if (!parentThread || !run) { + return; + } + const projectRoot = findProjectRoot(readModel, parentThread); + const acceptedAt = new Date().toISOString(); + if (!projectRoot || !run.worktreePath) { + yield* upsertRun(parentThread.id, { + ...run, + status: "accepted", + acceptedAt, + updatedAt: acceptedAt, + }); + return; + } + const status = yield* git + .status({ cwd: run.worktreePath }) + .pipe(Effect.catch(() => Effect.succeed(EMPTY_GIT_STATUS))); + if (status.hasWorkingTreeChanges) { + yield* upsertRun(parentThread.id, { + ...run, + status: "retained", + acceptedAt, + updatedAt: acceptedAt, + }); + return; + } + try { + yield* cleanupResources({ + projectRoot, + run, + force: false, + }); + yield* upsertRun(parentThread.id, { + ...run, + status: "cleaned_up", + acceptedAt, + branch: null, + worktreePath: null, + updatedAt: acceptedAt, + }); + } catch (cause) { + yield* upsertRun(parentThread.id, { + ...run, + status: "cleanup_failed", + acceptedAt, + lastError: cause instanceof Error ? cause.message : String(cause), + updatedAt: acceptedAt, + }); + } + }); + + const processCleanupRequested = ( + event: Extract, + ) => + Effect.gen(function* () { + const readModel = yield* orchestrationEngine.getReadModel(); + const parentThread = readModel.threads.find((thread) => thread.id === event.payload.threadId); + const run = parentThread ? findRun(parentThread, event.payload.runId) : null; + const projectRoot = parentThread ? findProjectRoot(readModel, parentThread) : null; + if (!parentThread || !run || !projectRoot) { + return; + } + try { + yield* cleanupResources({ + projectRoot, + run, + force: true, + }); + yield* upsertRun(parentThread.id, { + ...run, + status: "cleaned_up", + branch: null, + worktreePath: null, + updatedAt: new Date().toISOString(), + }); + } catch (cause) { + yield* upsertRun(parentThread.id, { + ...run, + status: "cleanup_failed", + lastError: cause instanceof Error ? cause.message : String(cause), + updatedAt: new Date().toISOString(), + }); + } + }); + + const processInput = (input: SubagentCoordinatorInput) => + input.kind === "event" + ? input.event.type === "thread.subagent-start-requested" + ? processStartRequested(input.event) + : input.event.type === "thread.subagent-report-accepted" + ? processReportAccepted(input.event) + : processCleanupRequested(input.event) + : processQuiescedReceipt(input.receipt); + + const processInputSafely = (input: SubagentCoordinatorInput) => + processInput(input).pipe( + Effect.catchCause((cause) => { + if (Cause.hasInterruptsOnly(cause)) { + return Effect.failCause(cause); + } + return Effect.logWarning("subagent coordinator failed to process input", { + kind: input.kind, + cause: Cause.pretty(cause), + ...(input.kind === "event" + ? { eventType: input.event.type } + : { receiptType: input.receipt.type, threadId: input.receipt.threadId }), + }); + }), + ); + + const worker = yield* makeDrainableWorker(processInputSafely); + + const start: SubagentCoordinatorShape["start"] = Effect.all( + [ + Effect.forkScoped( + Stream.runForEach(orchestrationEngine.streamDomainEvents, (event) => { + if ( + event.type !== "thread.subagent-start-requested" && + event.type !== "thread.subagent-report-accepted" && + event.type !== "thread.subagent-cleanup-requested" + ) { + return Effect.void; + } + return worker.enqueue({ kind: "event", event }); + }), + ), + Effect.forkScoped( + Stream.runForEach(runtimeReceiptBus.stream, (receipt) => { + if (receipt.type !== "turn.processing.quiesced") { + return Effect.void; + } + return worker.enqueue({ kind: "receipt", receipt }); + }), + ), + ], + { concurrency: 1 }, + ).pipe(Effect.asVoid); + + return { + start, + } satisfies SubagentCoordinatorShape; +}); + +export const SubagentCoordinatorLive = Layer.effect(SubagentCoordinator, makeSubagentCoordinator); diff --git a/apps/server/src/subagents/Services/SkillCatalog.ts b/apps/server/src/subagents/Services/SkillCatalog.ts new file mode 100644 index 0000000000..1bc357d74d --- /dev/null +++ b/apps/server/src/subagents/Services/SkillCatalog.ts @@ -0,0 +1,12 @@ +import { SubagentSkill } from "@t3tools/contracts"; +import { Option, ServiceMap } from "effect"; +import type { Effect } from "effect"; + +export interface SkillCatalogShape { + readonly listSkills: () => Effect.Effect>; + readonly getSkillById: (skillId: string) => Effect.Effect>; +} + +export class SkillCatalog extends ServiceMap.Service()( + "t3/subagents/Services/SkillCatalog", +) {} diff --git a/apps/server/src/subagents/Services/SubagentCoordinator.ts b/apps/server/src/subagents/Services/SubagentCoordinator.ts new file mode 100644 index 0000000000..b12bae1207 --- /dev/null +++ b/apps/server/src/subagents/Services/SubagentCoordinator.ts @@ -0,0 +1,11 @@ +import { ServiceMap } from "effect"; +import type { Effect, Scope } from "effect"; + +export interface SubagentCoordinatorShape { + readonly start: Effect.Effect; +} + +export class SubagentCoordinator extends ServiceMap.Service< + SubagentCoordinator, + SubagentCoordinatorShape +>()("t3/subagents/Services/SubagentCoordinator") {} diff --git a/apps/server/src/terminal/Layers/NodePTY.ts b/apps/server/src/terminal/Layers/NodePTY.ts index 60a7ab6220..d093a1b8e7 100644 --- a/apps/server/src/terminal/Layers/NodePTY.ts +++ b/apps/server/src/terminal/Layers/NodePTY.ts @@ -89,8 +89,7 @@ export const NodePtyAdapterLive = Layer.effect( Effect.gen(function* () { const fs = yield* FileSystem.FileSystem; const path = yield* Path.Path; - - const nodePty = yield* Effect.promise(() => import("node-pty")); + const loadNodePty = yield* Effect.cached(Effect.promise(() => import("node-pty"))); const ensureNodePtySpawnHelperExecutableCached = yield* Effect.cached( ensureNodePtySpawnHelperExecutable().pipe( @@ -102,6 +101,7 @@ export const NodePtyAdapterLive = Layer.effect( return { spawn: Effect.fn(function* (input) { + const nodePty = yield* loadNodePty; yield* ensureNodePtySpawnHelperExecutableCached; const ptyProcess = nodePty.spawn(input.shell, input.args ?? [], { cwd: input.cwd, diff --git a/apps/server/src/wsServer.test.ts b/apps/server/src/wsServer.test.ts index f12792a318..911e0392cf 100644 --- a/apps/server/src/wsServer.test.ts +++ b/apps/server/src/wsServer.test.ts @@ -830,6 +830,7 @@ describe("WebSocket Server", () => { keybindings: DEFAULT_RESOLVED_KEYBINDINGS, issues: [], providers: defaultProviderStatuses, + subagentSkills: expect.any(Array), availableEditors: expect.any(Array), }); expectAvailableEditors((response.result as { availableEditors: unknown }).availableEditors); @@ -855,6 +856,7 @@ describe("WebSocket Server", () => { keybindings: DEFAULT_RESOLVED_KEYBINDINGS, issues: [], providers: defaultProviderStatuses, + subagentSkills: expect.any(Array), availableEditors: expect.any(Array), }); expectAvailableEditors((response.result as { availableEditors: unknown }).availableEditors); @@ -890,6 +892,7 @@ describe("WebSocket Server", () => { }, ], providers: defaultProviderStatuses, + subagentSkills: expect.any(Array), availableEditors: expect.any(Array), }); expectAvailableEditors((response.result as { availableEditors: unknown }).availableEditors); @@ -1037,6 +1040,7 @@ describe("WebSocket Server", () => { keybindings: compileKeybindings(persistedConfig), issues: [], providers: defaultProviderStatuses, + subagentSkills: expect.any(Array), availableEditors: expect.any(Array), }); expectAvailableEditors((response.result as { availableEditors: unknown }).availableEditors); @@ -1084,6 +1088,7 @@ describe("WebSocket Server", () => { keybindings: compileKeybindings(persistedConfig), issues: [], providers: defaultProviderStatuses, + subagentSkills: expect.any(Array), availableEditors: expect.any(Array), }); expectAvailableEditors( diff --git a/apps/server/src/wsServer.ts b/apps/server/src/wsServer.ts index 2e6ac51b7f..f1b6f50555 100644 --- a/apps/server/src/wsServer.ts +++ b/apps/server/src/wsServer.ts @@ -78,6 +78,7 @@ import { expandHomePath } from "./os-jank.ts"; import { makeServerPushBus } from "./wsServer/pushBus.ts"; import { makeServerReadiness } from "./wsServer/readiness.ts"; import { decodeJsonResult, formatSchemaError } from "@t3tools/shared/schemaJson"; +import { SkillCatalog } from "./subagents/Services/SkillCatalog.ts"; /** * ServerShape - Service API for server lifecycle control. @@ -217,7 +218,8 @@ export type ServerRuntimeServices = | TerminalManager | Keybindings | Open - | AnalyticsService; + | AnalyticsService + | SkillCatalog; export class ServerLifecycleError extends Schema.TaggedErrorClass()( "ServerLifecycleError", @@ -255,6 +257,7 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return< const keybindingsManager = yield* Keybindings; const providerHealth = yield* ProviderHealth; const git = yield* GitCore; + const skillCatalog = yield* SkillCatalog; const fileSystem = yield* FileSystem.FileSystem; const path = yield* Path.Path; @@ -868,6 +871,7 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return< case WS_METHODS.serverGetConfig: const keybindingsConfig = yield* keybindingsManager.loadConfigState; + const subagentSkills = yield* skillCatalog.listSkills(); return { cwd, keybindingsConfigPath, @@ -875,6 +879,7 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return< issues: keybindingsConfig.issues, providers: providerStatuses, availableEditors, + subagentSkills, }; case WS_METHODS.serverUpsertKeybinding: { diff --git a/apps/web/src/components/ChatView.tsx b/apps/web/src/components/ChatView.tsx index 52637695e6..2d460ba485 100644 --- a/apps/web/src/components/ChatView.tsx +++ b/apps/web/src/components/ChatView.tsx @@ -15,6 +15,8 @@ import { type ProviderApprovalDecision, type ServerProviderStatus, type ProviderKind, + type SubagentRun, + type SubagentSkill, type ThreadId, type TurnId, OrchestrationThreadActivity, @@ -44,6 +46,7 @@ import { detectComposerTrigger, expandCollapsedComposerCursor, parseStandaloneComposerSlashCommand, + parseSkillInvocation, replaceTextRange, } from "../composer-logic"; import { @@ -169,6 +172,7 @@ const EMPTY_KEYBINDINGS: ResolvedKeybindingsConfig = []; const EMPTY_PROJECT_ENTRIES: ProjectEntry[] = []; const EMPTY_AVAILABLE_EDITORS: EditorId[] = []; const EMPTY_PROVIDER_STATUSES: ServerProviderStatus[] = []; +const EMPTY_SUBAGENT_SKILLS: SubagentSkill[] = []; const EMPTY_PENDING_USER_INPUT_ANSWERS: Record = {}; const COMPOSER_PATH_QUERY_DEBOUNCE_MS = 120; const SCRIPT_TERMINAL_COLS = 120; @@ -811,8 +815,13 @@ export default function ChatView({ threadId }: ChatViewProps) { }, [serverMessages, attachmentPreviewHandoffByMessageId, optimisticUserMessages]); const timelineEntries = useMemo( () => - deriveTimelineEntries(timelineMessages, activeThread?.proposedPlans ?? [], workLogEntries), - [activeThread?.proposedPlans, timelineMessages, workLogEntries], + deriveTimelineEntries( + timelineMessages, + activeThread?.proposedPlans ?? [], + workLogEntries, + activeThread?.subagentRuns ?? [], + ), + [activeThread?.proposedPlans, activeThread?.subagentRuns, timelineMessages, workLogEntries], ); const { turnDiffSummaries, inferredCheckpointTurnCountByTurnId } = useTurnDiffSummaries(activeThread); @@ -923,6 +932,10 @@ export default function ChatView({ threadId }: ChatViewProps) { }), ); const workspaceEntries = workspaceEntriesQuery.data?.entries ?? EMPTY_PROJECT_ENTRIES; + const keybindings = serverConfigQuery.data?.keybindings ?? EMPTY_KEYBINDINGS; + const availableEditors = serverConfigQuery.data?.availableEditors ?? EMPTY_AVAILABLE_EDITORS; + const providerStatuses = serverConfigQuery.data?.providers ?? EMPTY_PROVIDER_STATUSES; + const subagentSkills = serverConfigQuery.data?.subagentSkills ?? EMPTY_SUBAGENT_SKILLS; const composerMenuItems = useMemo(() => { if (!composerTrigger) return []; if (composerTrigger.kind === "path") { @@ -959,6 +972,13 @@ export default function ChatView({ threadId }: ChatViewProps) { label: "/default", description: "Switch this thread back to normal chat mode", }, + { + id: "slash:skill", + type: "slash-command", + command: "skill", + label: "/skill", + description: "Run a specialist sub-agent in an isolated worktree", + }, ] satisfies ReadonlyArray>; const query = composerTrigger.query.trim().toLowerCase(); if (!query) { @@ -969,6 +989,26 @@ export default function ChatView({ threadId }: ChatViewProps) { ); } + if (composerTrigger.kind === "slash-skill") { + const query = composerTrigger.query.trim().toLowerCase(); + return subagentSkills + .filter((skill) => { + if (!query) return true; + return ( + skill.id.toLowerCase().includes(query) || + skill.title.toLowerCase().includes(query) || + (skill.summary?.toLowerCase().includes(query) ?? false) + ); + }) + .map((skill) => ({ + id: `skill:${skill.id}`, + type: "skill" as const, + skill, + label: skill.id, + description: skill.summary ?? skill.title, + })); + } + return searchableModelOptions .filter(({ searchSlug, searchName, searchProvider }) => { const query = composerTrigger.query.trim().toLowerCase(); @@ -985,7 +1025,7 @@ export default function ChatView({ threadId }: ChatViewProps) { label: name, description: `${providerLabel} ยท ${slug}`, })); - }, [composerTrigger, searchableModelOptions, workspaceEntries]); + }, [composerTrigger, searchableModelOptions, subagentSkills, workspaceEntries]); const composerMenuOpen = Boolean(composerTrigger); const activeComposerMenuItem = useMemo( () => @@ -1001,9 +1041,6 @@ export default function ChatView({ threadId }: ChatViewProps) { () => new Set(nonPersistedComposerImageIds), [nonPersistedComposerImageIds], ); - const keybindings = serverConfigQuery.data?.keybindings ?? EMPTY_KEYBINDINGS; - const availableEditors = serverConfigQuery.data?.availableEditors ?? EMPTY_AVAILABLE_EDITORS; - const providerStatuses = serverConfigQuery.data?.providers ?? EMPTY_PROVIDER_STATUSES; const activeProvider = activeThread?.session?.provider ?? "codex"; const activeProviderStatus = useMemo( () => providerStatuses.find((status) => status.provider === activeProvider) ?? null, @@ -2232,6 +2269,37 @@ export default function ChatView({ threadId }: ChatViewProps) { setComposerTrigger(null); return; } + const skillInvocation = parseSkillInvocation(trimmed); + if (skillInvocation) { + if (composerImages.length > 0) { + setStoreThreadError(activeThread.id, "Sub-agent runs do not support attachments yet."); + return; + } + const commandId = newCommandId(); + try { + await api.orchestration.dispatchCommand({ + type: "thread.subagent.start", + commandId, + threadId: activeThread.id, + runId: commandId, + skillId: skillInvocation.skillId, + task: skillInvocation.task, + createdAt: new Date().toISOString(), + }); + } catch (error) { + setStoreThreadError( + activeThread.id, + error instanceof Error ? error.message : "Failed to start the sub-agent.", + ); + return; + } + promptRef.current = ""; + clearComposerDraftContent(activeThread.id); + setComposerHighlightedItemId(null); + setComposerCursor(0); + setComposerTrigger(null); + return; + } if (!trimmed && composerImages.length === 0) return; if (!activeProject) return; const threadIdForSend = activeThread.id; @@ -3038,6 +3106,24 @@ export default function ChatView({ threadId }: ChatViewProps) { } return; } + if (item.command === "skill") { + const replacement = "/skill "; + const replacementRangeEnd = extendReplacementRangeForTrailingSpace( + snapshot.value, + trigger.rangeEnd, + replacement, + ); + const applied = applyPromptReplacement( + trigger.rangeStart, + replacementRangeEnd, + replacement, + { expectedText: snapshot.value.slice(trigger.rangeStart, replacementRangeEnd) }, + ); + if (applied) { + setComposerHighlightedItemId(null); + } + return; + } void handleInteractionModeChange(item.command === "plan" ? "plan" : "default"); const applied = applyPromptReplacement(trigger.rangeStart, trigger.rangeEnd, "", { expectedText: snapshot.value.slice(trigger.rangeStart, trigger.rangeEnd), @@ -3047,6 +3133,24 @@ export default function ChatView({ threadId }: ChatViewProps) { } return; } + if (item.type === "skill") { + const replacement = `/skill ${item.skill.id} `; + const replacementRangeEnd = extendReplacementRangeForTrailingSpace( + snapshot.value, + trigger.rangeEnd, + replacement, + ); + const applied = applyPromptReplacement( + trigger.rangeStart, + replacementRangeEnd, + replacement, + { expectedText: snapshot.value.slice(trigger.rangeStart, replacementRangeEnd) }, + ); + if (applied) { + setComposerHighlightedItemId(null); + } + return; + } onProviderModelSelect(item.provider, item.model); const applied = applyPromptReplacement(trigger.rangeStart, trigger.rangeEnd, "", { expectedText: snapshot.value.slice(trigger.rangeStart, trigger.rangeEnd), @@ -3190,6 +3294,98 @@ export default function ChatView({ threadId }: ChatViewProps) { } void onRevertToTurnCount(targetTurnCount); }; + const onUseSubagentReport = useCallback( + async (run: SubagentRun) => { + if (!activeThreadId) { + return; + } + const report = run.report; + if (!report) { + return; + } + const nextPrompt = [ + promptRef.current.trim(), + `Sub-agent report from ${run.skillTitle} for: ${run.task}`, + report.summary, + report.markdown, + ] + .filter((segment) => segment.length > 0) + .join("\n\n"); + promptRef.current = nextPrompt; + setComposerDraftPrompt(activeThreadId, nextPrompt); + setComposerCursor(collapseExpandedComposerCursor(nextPrompt, nextPrompt.length)); + setComposerTrigger(null); + const api = readNativeApi(); + if (!api) { + return; + } + await api.orchestration.dispatchCommand({ + type: "thread.subagent.acceptReport", + commandId: newCommandId(), + threadId: activeThreadId, + runId: run.id, + createdAt: new Date().toISOString(), + }); + }, + [activeThreadId, setComposerDraftPrompt], + ); + const onOpenSubagentWorktreeThread = useCallback( + async (run: SubagentRun) => { + const api = readNativeApi(); + if (!api || !activeProject || !activeThread || !run.branch || !run.worktreePath) { + return; + } + const existingThread = threads.find( + (thread) => + thread.id !== activeThread.id && + thread.projectId === activeProject.id && + thread.branch === run.branch && + thread.worktreePath === run.worktreePath, + ); + if (existingThread) { + void navigate({ + to: "/$threadId", + params: { threadId: existingThread.id }, + }); + return; + } + const nextThreadId = newThreadId(); + await api.orchestration.dispatchCommand({ + type: "thread.create", + commandId: newCommandId(), + threadId: nextThreadId, + projectId: activeProject.id, + title: truncateTitle(`${run.skillTitle} worktree`), + model: activeThread.model as ModelSlug, + runtimeMode: activeThread.runtimeMode, + interactionMode: "default", + branch: run.branch, + worktreePath: run.worktreePath, + createdAt: new Date().toISOString(), + }); + void navigate({ + to: "/$threadId", + params: { threadId: nextThreadId }, + }); + }, + [activeProject, activeThread, navigate, threads], + ); + const onDiscardSubagentRun = useCallback( + async (run: SubagentRun) => { + const api = readNativeApi(); + if (!api || !activeThreadId) { + return; + } + await api.orchestration.dispatchCommand({ + type: "thread.subagent.cleanup", + commandId: newCommandId(), + threadId: activeThreadId, + runId: run.id, + createdAt: new Date().toISOString(), + }); + }, + [activeThreadId], + ); // Empty state: no active thread if (!activeThread) { @@ -3297,6 +3493,9 @@ export default function ChatView({ threadId }: ChatViewProps) { onRevertUserMessage={onRevertUserMessage} isRevertingCheckpoint={isRevertingCheckpoint} onImageExpand={onExpandTimelineImage} + onUseSubagentReport={onUseSubagentReport} + onOpenSubagentWorktreeThread={onOpenSubagentWorktreeThread} + onDiscardSubagentRun={onDiscardSubagentRun} markdownCwd={gitCwd ?? undefined} resolvedTheme={resolvedTheme} timestampFormat={timestampFormat} diff --git a/apps/web/src/components/chat/ComposerCommandMenu.tsx b/apps/web/src/components/chat/ComposerCommandMenu.tsx index 818c3c20f8..cb0aa9122b 100644 --- a/apps/web/src/components/chat/ComposerCommandMenu.tsx +++ b/apps/web/src/components/chat/ComposerCommandMenu.tsx @@ -1,4 +1,9 @@ -import { type ProjectEntry, type ModelSlug, type ProviderKind } from "@t3tools/contracts"; +import { + type ProjectEntry, + type ModelSlug, + type ProviderKind, + type SubagentSkill, +} from "@t3tools/contracts"; import { memo } from "react"; import { type ComposerSlashCommand, type ComposerTriggerKind } from "../../composer-logic"; import { BotIcon } from "lucide-react"; @@ -30,6 +35,13 @@ export type ComposerCommandItem = model: ModelSlug; label: string; description: string; + } + | { + id: string; + type: "skill"; + skill: SubagentSkill; + label: string; + description: string; }; export const ComposerCommandMenu = memo(function ComposerCommandMenu(props: { @@ -111,6 +123,11 @@ const ComposerCommandMenuItem = memo(function ComposerCommandMenuItem(props: { model ) : null} + {props.item.type === "skill" ? ( + + skill + + ) : null} {props.item.label} diff --git a/apps/web/src/components/chat/MessagesTimeline.tsx b/apps/web/src/components/chat/MessagesTimeline.tsx index e30801041f..7f7998d667 100644 --- a/apps/web/src/components/chat/MessagesTimeline.tsx +++ b/apps/web/src/components/chat/MessagesTimeline.tsx @@ -1,4 +1,4 @@ -import { type MessageId, type TurnId } from "@t3tools/contracts"; +import { type MessageId, type SubagentRun, type TurnId } from "@t3tools/contracts"; import { memo, useCallback, useEffect, useLayoutEffect, useMemo, useRef, useState } from "react"; import { measureElement as measureVirtualElement, @@ -36,6 +36,7 @@ import { computeMessageDurationStart, normalizeCompactToolLabel } from "./Messag import { cn } from "~/lib/utils"; import { type TimestampFormat } from "../../appSettings"; import { formatTimestamp } from "../../timestampFormat"; +import { SubagentReportCard } from "./SubagentReportCard"; const MAX_VISIBLE_WORK_LOG_ENTRIES = 6; const ALWAYS_UNVIRTUALIZED_TAIL_ROWS = 8; @@ -58,6 +59,9 @@ interface MessagesTimelineProps { onRevertUserMessage: (messageId: MessageId) => void; isRevertingCheckpoint: boolean; onImageExpand: (preview: ExpandedImagePreview) => void; + onUseSubagentReport: (run: SubagentRun) => void; + onOpenSubagentWorktreeThread: (run: SubagentRun) => void; + onDiscardSubagentRun: (run: SubagentRun) => void; markdownCwd: string | undefined; resolvedTheme: "light" | "dark"; timestampFormat: TimestampFormat; @@ -82,6 +86,9 @@ export const MessagesTimeline = memo(function MessagesTimeline({ onRevertUserMessage, isRevertingCheckpoint, onImageExpand, + onUseSubagentReport, + onOpenSubagentWorktreeThread, + onDiscardSubagentRun, markdownCwd, resolvedTheme, timestampFormat, @@ -156,6 +163,16 @@ export const MessagesTimeline = memo(function MessagesTimeline({ continue; } + if (timelineEntry.kind === "subagent-run") { + nextRows.push({ + kind: "subagent-run", + id: timelineEntry.id, + createdAt: timelineEntry.createdAt, + subagentRun: timelineEntry.subagentRun, + }); + continue; + } + nextRows.push({ kind: "message", id: timelineEntry.id, @@ -233,6 +250,7 @@ export const MessagesTimeline = memo(function MessagesTimeline({ if (!row) return 96; if (row.kind === "work") return 112; if (row.kind === "proposed-plan") return estimateTimelineProposedPlanHeight(row.proposedPlan); + if (row.kind === "subagent-run") return 260; if (row.kind === "working") return 40; return estimateTimelineMessageHeight(row.message, { timelineWidthPx }); }, @@ -509,6 +527,18 @@ export const MessagesTimeline = memo(function MessagesTimeline({ )} + {row.kind === "subagent-run" && ( +
+ +
+ )} + {row.kind === "working" && (
@@ -576,6 +606,7 @@ type TimelineEntry = ReturnType[number]; type TimelineMessage = Extract["message"]; type TimelineProposedPlan = Extract["proposedPlan"]; type TimelineWorkEntry = Extract["entry"]; +type TimelineSubagentRun = Extract["subagentRun"]; type TimelineRow = | { kind: "work"; @@ -597,6 +628,12 @@ type TimelineRow = createdAt: string; proposedPlan: TimelineProposedPlan; } + | { + kind: "subagent-run"; + id: string; + createdAt: string; + subagentRun: TimelineSubagentRun; + } | { kind: "working"; id: string; createdAt: string | null }; function estimateTimelineProposedPlanHeight(proposedPlan: TimelineProposedPlan): number { diff --git a/apps/web/src/components/chat/SubagentReportCard.tsx b/apps/web/src/components/chat/SubagentReportCard.tsx new file mode 100644 index 0000000000..e1fba5c625 --- /dev/null +++ b/apps/web/src/components/chat/SubagentReportCard.tsx @@ -0,0 +1,84 @@ +import { type SubagentRun } from "@t3tools/contracts"; + +import ChatMarkdown from "../ChatMarkdown"; +import { Button } from "../ui/button"; + +function statusLabel(status: SubagentRun["status"]): string { + switch (status) { + case "preparing": + return "Preparing specialist"; + case "running": + return "Running specialist"; + case "report_ready": + return "Report ready"; + case "accepted": + return "Report accepted"; + case "retained": + return "Worktree retained"; + case "cleaned_up": + return "Cleaned up"; + case "cleanup_failed": + return "Cleanup failed"; + case "failed": + return "Failed"; + } +} + +export function SubagentReportCard(props: { + run: SubagentRun; + markdownCwd: string | undefined; + onUseReport: (run: SubagentRun) => void; + onOpenWorktreeThread: (run: SubagentRun) => void; + onDiscard: (run: SubagentRun) => void; +}) { + const canUseReport = props.run.report !== null; + const canOpenWorktree = !!props.run.worktreePath && !!props.run.branch; + + return ( +
+
+
+

+ Specialist +

+

{props.run.skillTitle}

+

{statusLabel(props.run.status)}

+
+

{props.run.task}

+
+ {props.run.report ? ( +
+

{props.run.report.summary}

+
+ +
+
+ ) : null} + {props.run.lastError ? ( +

{props.run.lastError}

+ ) : null} +
+ + + +
+
+ ); +} diff --git a/apps/web/src/composer-logic.ts b/apps/web/src/composer-logic.ts index b696d80381..4eb9c2e3ab 100644 --- a/apps/web/src/composer-logic.ts +++ b/apps/web/src/composer-logic.ts @@ -1,7 +1,12 @@ import { splitPromptIntoComposerSegments } from "./composer-editor-mentions"; -export type ComposerTriggerKind = "path" | "slash-command" | "slash-model"; -export type ComposerSlashCommand = "model" | "plan" | "default"; +export type ComposerTriggerKind = "path" | "slash-command" | "slash-model" | "slash-skill"; +export type ComposerSlashCommand = "model" | "plan" | "default" | "skill"; + +export interface ParsedSkillInvocation { + skillId: string; + task: string; +} export interface ComposerTrigger { kind: ComposerTriggerKind; @@ -10,7 +15,7 @@ export interface ComposerTrigger { rangeEnd: number; } -const SLASH_COMMANDS: readonly ComposerSlashCommand[] = ["model", "plan", "default"]; +const SLASH_COMMANDS: readonly ComposerSlashCommand[] = ["model", "plan", "default", "skill"]; function clampCursor(text: string, cursor: number): number { if (!Number.isFinite(cursor)) return text.length; @@ -188,6 +193,16 @@ export function detectComposerTrigger(text: string, cursorInput: number): Compos rangeEnd: cursor, }; } + + const skillMatch = /^\/skill(?:\s+([^\s]*))?$/.exec(linePrefix); + if (skillMatch) { + return { + kind: "slash-skill", + query: (skillMatch[1] ?? "").trim(), + rangeStart: lineStart, + rangeEnd: cursor, + }; + } } const tokenStart = tokenStartForCursor(text, cursor); @@ -206,7 +221,7 @@ export function detectComposerTrigger(text: string, cursorInput: number): Compos export function parseStandaloneComposerSlashCommand( text: string, -): Exclude | null { +): Exclude | null { const match = /^\/(plan|default)\s*$/i.exec(text.trim()); if (!match) { return null; @@ -216,6 +231,19 @@ export function parseStandaloneComposerSlashCommand( return "default"; } +export function parseSkillInvocation(text: string): ParsedSkillInvocation | null { + const match = /^\/skill\s+(\S+)\s+([\s\S]+)$/i.exec(text.trim()); + if (!match) { + return null; + } + const skillId = match[1]?.trim() ?? ""; + const task = match[2]?.trim() ?? ""; + if (skillId.length === 0 || task.length === 0) { + return null; + } + return { skillId, task }; +} + export function replaceTextRange( text: string, rangeStart: number, diff --git a/apps/web/src/session-logic.ts b/apps/web/src/session-logic.ts index e389f10e2d..5fe2b3ca3f 100644 --- a/apps/web/src/session-logic.ts +++ b/apps/web/src/session-logic.ts @@ -5,6 +5,7 @@ import { type OrchestrationThreadActivity, type OrchestrationProposedPlanId, type ProviderKind, + type SubagentRun, type ToolLifecycleItemType, type UserInputQuestion, type TurnId, @@ -92,6 +93,12 @@ export type TimelineEntry = kind: "work"; createdAt: string; entry: WorkLogEntry; + } + | { + id: string; + kind: "subagent-run"; + createdAt: string; + subagentRun: SubagentRun; }; export function formatDuration(durationMs: number): string { @@ -634,6 +641,7 @@ export function deriveTimelineEntries( messages: ChatMessage[], proposedPlans: ProposedPlan[], workEntries: WorkLogEntry[], + subagentRuns: ReadonlyArray = [], ): TimelineEntry[] { const messageRows: TimelineEntry[] = messages.map((message) => ({ id: message.id, @@ -653,7 +661,13 @@ export function deriveTimelineEntries( createdAt: entry.createdAt, entry, })); - return [...messageRows, ...proposedPlanRows, ...workRows].toSorted((a, b) => + const subagentRows: TimelineEntry[] = subagentRuns.map((subagentRun) => ({ + id: subagentRun.id, + kind: "subagent-run", + createdAt: subagentRun.createdAt, + subagentRun, + })); + return [...messageRows, ...proposedPlanRows, ...workRows, ...subagentRows].toSorted((a, b) => a.createdAt.localeCompare(b.createdAt), ); } diff --git a/apps/web/src/store.ts b/apps/web/src/store.ts index faebe4b0fb..d634c6b7d8 100644 --- a/apps/web/src/store.ts +++ b/apps/web/src/store.ts @@ -323,6 +323,7 @@ export function syncServerReadModel(state: AppState, readModel: OrchestrationRea files: checkpoint.files.map((file) => ({ ...file })), })), activities: thread.activities.map((activity) => ({ ...activity })), + subagentRuns: thread.subagentRuns?.map((run) => ({ ...run })) ?? [], }; }); return { diff --git a/apps/web/src/types.ts b/apps/web/src/types.ts index c071fb3f60..20b72b042b 100644 --- a/apps/web/src/types.ts +++ b/apps/web/src/types.ts @@ -12,6 +12,7 @@ import type { ProviderKind, ProviderInteractionMode, RuntimeMode, + SubagentRun, } from "@t3tools/contracts"; export type SessionPhase = "disconnected" | "connecting" | "ready" | "running"; @@ -102,6 +103,7 @@ export interface Thread { worktreePath: string | null; turnDiffSummaries: TurnDiffSummary[]; activities: OrchestrationThreadActivity[]; + subagentRuns?: SubagentRun[] | undefined; } export interface ThreadSession { diff --git a/packages/contracts/src/index.ts b/packages/contracts/src/index.ts index 0f37a93515..952b840b22 100644 --- a/packages/contracts/src/index.ts +++ b/packages/contracts/src/index.ts @@ -11,3 +11,4 @@ export * from "./git"; export * from "./orchestration"; export * from "./editor"; export * from "./project"; +export * from "./subagent"; diff --git a/packages/contracts/src/orchestration.ts b/packages/contracts/src/orchestration.ts index 17c5eb21d6..b26bf39f16 100644 --- a/packages/contracts/src/orchestration.ts +++ b/packages/contracts/src/orchestration.ts @@ -1,5 +1,6 @@ import { Option, Schema, SchemaIssue, Struct } from "effect"; import { ProviderModelOptions } from "./model"; +import { SubagentRun, SubagentRunId } from "./subagent"; import { ApprovalRequestId, CheckpointRef, @@ -56,6 +57,8 @@ export const DEFAULT_RUNTIME_MODE: RuntimeMode = "full-access"; export const ProviderInteractionMode = Schema.Literals(["default", "plan"]); export type ProviderInteractionMode = typeof ProviderInteractionMode.Type; export const DEFAULT_PROVIDER_INTERACTION_MODE: ProviderInteractionMode = "default"; +export const OrchestrationThreadKind = Schema.Literals(["primary", "subagent"]); +export type OrchestrationThreadKind = typeof OrchestrationThreadKind.Type; export const ProviderRequestKind = Schema.Literals(["command", "file-read", "file-change"]); export type ProviderRequestKind = typeof ProviderRequestKind.Type; export const AssistantDeliveryMode = Schema.Literals(["buffered", "streaming"]); @@ -259,6 +262,12 @@ export const OrchestrationThread = Schema.Struct({ interactionMode: ProviderInteractionMode.pipe( Schema.withDecodingDefault(() => DEFAULT_PROVIDER_INTERACTION_MODE), ), + threadKind: Schema.optional( + OrchestrationThreadKind.pipe(Schema.withDecodingDefault(() => "primary")), + ), + parentThreadId: Schema.optional( + Schema.NullOr(ThreadId).pipe(Schema.withDecodingDefault(() => null)), + ), branch: Schema.NullOr(TrimmedNonEmptyString), worktreePath: Schema.NullOr(TrimmedNonEmptyString), latestTurn: Schema.NullOr(OrchestrationLatestTurn), @@ -269,6 +278,9 @@ export const OrchestrationThread = Schema.Struct({ proposedPlans: Schema.Array(OrchestrationProposedPlan).pipe(Schema.withDecodingDefault(() => [])), activities: Schema.Array(OrchestrationThreadActivity), checkpoints: Schema.Array(OrchestrationCheckpointSummary), + subagentRuns: Schema.optional( + Schema.Array(SubagentRun).pipe(Schema.withDecodingDefault(() => [])), + ), session: Schema.NullOr(OrchestrationSession), }); export type OrchestrationThread = typeof OrchestrationThread.Type; @@ -318,6 +330,12 @@ const ThreadCreateCommand = Schema.Struct({ interactionMode: ProviderInteractionMode.pipe( Schema.withDecodingDefault(() => DEFAULT_PROVIDER_INTERACTION_MODE), ), + threadKind: Schema.optional( + OrchestrationThreadKind.pipe(Schema.withDecodingDefault(() => "primary")), + ), + parentThreadId: Schema.optional( + Schema.NullOr(ThreadId).pipe(Schema.withDecodingDefault(() => null)), + ), branch: Schema.NullOr(TrimmedNonEmptyString), worktreePath: Schema.NullOr(TrimmedNonEmptyString), createdAt: IsoDateTime, @@ -374,6 +392,7 @@ export const ThreadTurnStartCommand = Schema.Struct({ interactionMode: ProviderInteractionMode.pipe( Schema.withDecodingDefault(() => DEFAULT_PROVIDER_INTERACTION_MODE), ), + developerInstructions: Schema.optional(TrimmedNonEmptyString), createdAt: IsoDateTime, }); @@ -438,6 +457,32 @@ const ThreadSessionStopCommand = Schema.Struct({ createdAt: IsoDateTime, }); +const ThreadSubagentStartCommand = Schema.Struct({ + type: Schema.Literal("thread.subagent.start"), + commandId: CommandId, + threadId: ThreadId, + runId: SubagentRunId, + skillId: TrimmedNonEmptyString, + task: TrimmedNonEmptyString, + createdAt: IsoDateTime, +}); + +const ThreadSubagentAcceptReportCommand = Schema.Struct({ + type: Schema.Literal("thread.subagent.acceptReport"), + commandId: CommandId, + threadId: ThreadId, + runId: SubagentRunId, + createdAt: IsoDateTime, +}); + +const ThreadSubagentCleanupCommand = Schema.Struct({ + type: Schema.Literal("thread.subagent.cleanup"), + commandId: CommandId, + threadId: ThreadId, + runId: SubagentRunId, + createdAt: IsoDateTime, +}); + const DispatchableClientOrchestrationCommand = Schema.Union([ ProjectCreateCommand, ProjectMetaUpdateCommand, @@ -453,6 +498,9 @@ const DispatchableClientOrchestrationCommand = Schema.Union([ ThreadUserInputRespondCommand, ThreadCheckpointRevertCommand, ThreadSessionStopCommand, + ThreadSubagentStartCommand, + ThreadSubagentAcceptReportCommand, + ThreadSubagentCleanupCommand, ]); export type DispatchableClientOrchestrationCommand = typeof DispatchableClientOrchestrationCommand.Type; @@ -472,6 +520,9 @@ export const ClientOrchestrationCommand = Schema.Union([ ThreadUserInputRespondCommand, ThreadCheckpointRevertCommand, ThreadSessionStopCommand, + ThreadSubagentStartCommand, + ThreadSubagentAcceptReportCommand, + ThreadSubagentCleanupCommand, ]); export type ClientOrchestrationCommand = typeof ClientOrchestrationCommand.Type; @@ -540,6 +591,14 @@ const ThreadRevertCompleteCommand = Schema.Struct({ createdAt: IsoDateTime, }); +const ThreadSubagentUpsertCommand = Schema.Struct({ + type: Schema.Literal("thread.subagent.upsert"), + commandId: CommandId, + threadId: ThreadId, + subagentRun: SubagentRun, + createdAt: IsoDateTime, +}); + const InternalOrchestrationCommand = Schema.Union([ ThreadSessionSetCommand, ThreadMessageAssistantDeltaCommand, @@ -548,6 +607,7 @@ const InternalOrchestrationCommand = Schema.Union([ ThreadTurnDiffCompleteCommand, ThreadActivityAppendCommand, ThreadRevertCompleteCommand, + ThreadSubagentUpsertCommand, ]); export type InternalOrchestrationCommand = typeof InternalOrchestrationCommand.Type; @@ -574,10 +634,14 @@ export const OrchestrationEventType = Schema.Literals([ "thread.checkpoint-revert-requested", "thread.reverted", "thread.session-stop-requested", + "thread.subagent-start-requested", + "thread.subagent-report-accepted", + "thread.subagent-cleanup-requested", "thread.session-set", "thread.proposed-plan-upserted", "thread.turn-diff-completed", "thread.activity-appended", + "thread.subagent-upserted", ]); export type OrchestrationEventType = typeof OrchestrationEventType.Type; @@ -618,6 +682,12 @@ export const ThreadCreatedPayload = Schema.Struct({ interactionMode: ProviderInteractionMode.pipe( Schema.withDecodingDefault(() => DEFAULT_PROVIDER_INTERACTION_MODE), ), + threadKind: Schema.optional( + OrchestrationThreadKind.pipe(Schema.withDecodingDefault(() => "primary")), + ), + parentThreadId: Schema.optional( + Schema.NullOr(ThreadId).pipe(Schema.withDecodingDefault(() => null)), + ), branch: Schema.NullOr(TrimmedNonEmptyString), worktreePath: Schema.NullOr(TrimmedNonEmptyString), createdAt: IsoDateTime, @@ -676,6 +746,7 @@ export const ThreadTurnStartRequestedPayload = Schema.Struct({ interactionMode: ProviderInteractionMode.pipe( Schema.withDecodingDefault(() => DEFAULT_PROVIDER_INTERACTION_MODE), ), + developerInstructions: Schema.optional(TrimmedNonEmptyString), createdAt: IsoDateTime, }); @@ -715,6 +786,26 @@ export const ThreadSessionStopRequestedPayload = Schema.Struct({ createdAt: IsoDateTime, }); +export const ThreadSubagentStartRequestedPayload = Schema.Struct({ + threadId: ThreadId, + runId: SubagentRunId, + skillId: TrimmedNonEmptyString, + task: TrimmedNonEmptyString, + createdAt: IsoDateTime, +}); + +export const ThreadSubagentReportAcceptedPayload = Schema.Struct({ + threadId: ThreadId, + runId: SubagentRunId, + createdAt: IsoDateTime, +}); + +export const ThreadSubagentCleanupRequestedPayload = Schema.Struct({ + threadId: ThreadId, + runId: SubagentRunId, + createdAt: IsoDateTime, +}); + export const ThreadSessionSetPayload = Schema.Struct({ threadId: ThreadId, session: OrchestrationSession, @@ -741,6 +832,11 @@ export const ThreadActivityAppendedPayload = Schema.Struct({ activity: OrchestrationThreadActivity, }); +export const ThreadSubagentUpsertedPayload = Schema.Struct({ + threadId: ThreadId, + subagentRun: SubagentRun, +}); + export const OrchestrationEventMetadata = Schema.Struct({ providerTurnId: Schema.optional(TrimmedNonEmptyString), providerItemId: Schema.optional(ProviderItemId), @@ -843,6 +939,21 @@ export const OrchestrationEvent = Schema.Union([ type: Schema.Literal("thread.session-stop-requested"), payload: ThreadSessionStopRequestedPayload, }), + Schema.Struct({ + ...EventBaseFields, + type: Schema.Literal("thread.subagent-start-requested"), + payload: ThreadSubagentStartRequestedPayload, + }), + Schema.Struct({ + ...EventBaseFields, + type: Schema.Literal("thread.subagent-report-accepted"), + payload: ThreadSubagentReportAcceptedPayload, + }), + Schema.Struct({ + ...EventBaseFields, + type: Schema.Literal("thread.subagent-cleanup-requested"), + payload: ThreadSubagentCleanupRequestedPayload, + }), Schema.Struct({ ...EventBaseFields, type: Schema.Literal("thread.session-set"), @@ -863,6 +974,11 @@ export const OrchestrationEvent = Schema.Union([ type: Schema.Literal("thread.activity-appended"), payload: ThreadActivityAppendedPayload, }), + Schema.Struct({ + ...EventBaseFields, + type: Schema.Literal("thread.subagent-upserted"), + payload: ThreadSubagentUpsertedPayload, + }), ]); export type OrchestrationEvent = typeof OrchestrationEvent.Type; diff --git a/packages/contracts/src/provider.ts b/packages/contracts/src/provider.ts index 9d2a198b6d..34d35831e5 100644 --- a/packages/contracts/src/provider.ts +++ b/packages/contracts/src/provider.ts @@ -82,6 +82,7 @@ export const ProviderSendTurnInput = Schema.Struct({ model: Schema.optional(TrimmedNonEmptyStringSchema), modelOptions: Schema.optional(ProviderModelOptions), interactionMode: Schema.optional(ProviderInteractionMode), + developerInstructions: Schema.optional(TrimmedNonEmptyStringSchema), }); export type ProviderSendTurnInput = typeof ProviderSendTurnInput.Type; diff --git a/packages/contracts/src/server.ts b/packages/contracts/src/server.ts index 96ea90c1f5..73c24748a4 100644 --- a/packages/contracts/src/server.ts +++ b/packages/contracts/src/server.ts @@ -3,6 +3,7 @@ import { IsoDateTime, TrimmedNonEmptyString } from "./baseSchemas"; import { KeybindingRule, ResolvedKeybindingsConfig } from "./keybindings"; import { EditorId } from "./editor"; import { ProviderKind } from "./orchestration"; +import { SubagentSkill } from "./subagent"; const KeybindingsMalformedConfigIssue = Schema.Struct({ kind: Schema.Literal("keybindings.malformed-config"), @@ -52,6 +53,9 @@ export const ServerConfig = Schema.Struct({ issues: ServerConfigIssues, providers: ServerProviderStatuses, availableEditors: Schema.Array(EditorId), + subagentSkills: Schema.optional( + Schema.Array(SubagentSkill).pipe(Schema.withDecodingDefault(() => [])), + ), }); export type ServerConfig = typeof ServerConfig.Type; diff --git a/packages/contracts/src/subagent.ts b/packages/contracts/src/subagent.ts new file mode 100644 index 0000000000..5ffcd1a96d --- /dev/null +++ b/packages/contracts/src/subagent.ts @@ -0,0 +1,59 @@ +import { Schema } from "effect"; + +import { IsoDateTime, ThreadId, TrimmedNonEmptyString } from "./baseSchemas"; + +export const SubagentRunId = TrimmedNonEmptyString; +export type SubagentRunId = typeof SubagentRunId.Type; + +export const SubagentSkill = Schema.Struct({ + id: TrimmedNonEmptyString, + title: TrimmedNonEmptyString, + path: TrimmedNonEmptyString, + promptMarkdown: TrimmedNonEmptyString, + summary: Schema.optional(TrimmedNonEmptyString), +}); +export type SubagentSkill = typeof SubagentSkill.Type; + +export const SubagentRunStatus = Schema.Literals([ + "preparing", + "running", + "report_ready", + "accepted", + "retained", + "cleaned_up", + "failed", + "cleanup_failed", +]); +export type SubagentRunStatus = typeof SubagentRunStatus.Type; + +export const SubagentReport = Schema.Struct({ + summary: TrimmedNonEmptyString, + markdown: TrimmedNonEmptyString, + findings: Schema.Array(TrimmedNonEmptyString).pipe(Schema.withDecodingDefault(() => [])), + actionsTaken: Schema.Array(TrimmedNonEmptyString).pipe(Schema.withDecodingDefault(() => [])), + recommendedActions: Schema.Array(TrimmedNonEmptyString).pipe( + Schema.withDecodingDefault(() => []), + ), + filesChanged: Schema.Array(TrimmedNonEmptyString).pipe(Schema.withDecodingDefault(() => [])), + generatedAt: IsoDateTime, +}); +export type SubagentReport = typeof SubagentReport.Type; + +export const SubagentRun = Schema.Struct({ + id: SubagentRunId, + parentThreadId: ThreadId, + subagentThreadId: Schema.NullOr(ThreadId), + skillId: TrimmedNonEmptyString, + skillTitle: TrimmedNonEmptyString, + task: TrimmedNonEmptyString, + status: SubagentRunStatus, + branch: Schema.NullOr(TrimmedNonEmptyString), + worktreePath: Schema.NullOr(TrimmedNonEmptyString), + report: Schema.NullOr(SubagentReport), + lastError: Schema.NullOr(TrimmedNonEmptyString), + createdAt: IsoDateTime, + updatedAt: IsoDateTime, + completedAt: Schema.NullOr(IsoDateTime), + acceptedAt: Schema.NullOr(IsoDateTime), +}); +export type SubagentRun = typeof SubagentRun.Type;