diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index 6ae94105a..2b9d397ff 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -653,6 +653,8 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { threadId: event.payload.threadId, turnId: event.payload.proposedPlan.turnId, planMarkdown: event.payload.proposedPlan.planMarkdown, + implementedAt: event.payload.proposedPlan.implementedAt, + implementationThreadId: event.payload.proposedPlan.implementationThreadId, createdAt: event.payload.proposedPlan.createdAt, updatedAt: event.payload.proposedPlan.updatedAt, }); @@ -775,11 +777,23 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { yield* projectionTurnRepository.replacePendingTurnStart({ threadId: event.payload.threadId, messageId: event.payload.messageId, + sourceProposedPlanThreadId: event.payload.sourceProposedPlan?.threadId ?? null, + sourceProposedPlanId: event.payload.sourceProposedPlan?.planId ?? null, requestedAt: event.payload.createdAt, }); return; } + case "thread.activity-appended": { + if (event.payload.activity.kind !== "provider.turn.start.failed") { + return; + } + yield* projectionTurnRepository.deletePendingTurnStartByThreadId({ + threadId: event.payload.threadId, + }); + return; + } + case "thread.session-set": { const turnId = event.payload.session.activeTurnId; if (turnId === null || event.payload.session.status !== "running") { diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts index fc7db5480..9bfe581de 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts @@ -26,6 +26,7 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => { yield* sql`DELETE FROM projection_projects`; yield* sql`DELETE FROM projection_state`; + yield* sql`DELETE FROM projection_thread_proposed_plans`; yield* sql`DELETE FROM projection_turns`; yield* sql` @@ -101,6 +102,29 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => { ) `; + yield* sql` + INSERT INTO projection_thread_proposed_plans ( + plan_id, + thread_id, + turn_id, + plan_markdown, + implemented_at, + implementation_thread_id, + created_at, + updated_at + ) + VALUES ( + 'plan-1', + 'thread-1', + 'turn-1', + '# Ship it', + '2026-02-24T00:00:05.500Z', + 'thread-2', + '2026-02-24T00:00:05.000Z', + '2026-02-24T00:00:05.500Z' + ) + `; + yield* sql` INSERT INTO projection_thread_activities ( activity_id, @@ -253,7 +277,17 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => { updatedAt: "2026-02-24T00:00:05.000Z", }, ], - proposedPlans: [], + proposedPlans: [ + { + id: "plan-1", + turnId: asTurnId("turn-1"), + planMarkdown: "# Ship it", + implementedAt: "2026-02-24T00:00:05.500Z", + implementationThreadId: ThreadId.makeUnsafe("thread-2"), + createdAt: "2026-02-24T00:00:05.000Z", + updatedAt: "2026-02-24T00:00:05.500Z", + }, + ], activities: [ { id: asEventId("activity-1"), diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts index 5fd38a540..1b5134fbc 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts @@ -200,6 +200,8 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { thread_id AS "threadId", turn_id AS "turnId", plan_markdown AS "planMarkdown", + implemented_at AS "implementedAt", + implementation_thread_id AS "implementationThreadId", created_at AS "createdAt", updated_at AS "updatedAt" FROM projection_thread_proposed_plans @@ -435,6 +437,8 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { id: row.planId, turnId: row.turnId, planMarkdown: row.planMarkdown, + implementedAt: row.implementedAt, + implementationThreadId: row.implementationThreadId, createdAt: row.createdAt, updatedAt: row.updatedAt, }); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index b6b48c7ed..7ca090a5c 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -86,11 +86,12 @@ async function waitForThread( engine: OrchestrationEngineShape, predicate: (thread: ProviderRuntimeTestThread) => boolean, timeoutMs = 2000, + threadId: ThreadId = asThreadId("thread-1"), ) { const deadline = Date.now() + timeoutMs; const poll = async (): Promise => { const readModel = await Effect.runPromise(engine.getReadModel()); - const thread = readModel.threads.find((entry) => entry.id === ThreadId.makeUnsafe("thread-1")); + const thread = readModel.threads.find((entry) => entry.id === threadId); if (thread && predicate(thread)) { return thread; } @@ -150,6 +151,7 @@ describe("ProviderRuntimeIngestion", () => { ); const layer = ProviderRuntimeIngestionLive.pipe( Layer.provideMerge(orchestrationLayer), + Layer.provideMerge(SqlitePersistenceMemory), Layer.provideMerge(Layer.succeed(ProviderService, provider.service)), Layer.provideMerge(ServerConfig.layerTest(process.cwd(), process.cwd())), Layer.provideMerge(NodeServices.layer), @@ -628,6 +630,334 @@ describe("ProviderRuntimeIngestion", () => { ); }); + it("marks the source proposed plan implemented only after the target turn starts", async () => { + const harness = await createHarness(); + const sourceThreadId = asThreadId("thread-plan"); + const targetThreadId = asThreadId("thread-implement"); + const sourceTurnId = asTurnId("turn-plan-source"); + const targetTurnId = asTurnId("turn-plan-implement"); + const createdAt = new Date().toISOString(); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.create", + commandId: CommandId.makeUnsafe("cmd-thread-create-plan-source"), + threadId: sourceThreadId, + projectId: asProjectId("project-1"), + title: "Plan Source", + model: "gpt-5-codex", + interactionMode: "plan", + runtimeMode: "approval-required", + branch: null, + worktreePath: null, + createdAt, + }), + ); + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.set", + commandId: CommandId.makeUnsafe("cmd-session-set-plan-source"), + threadId: sourceThreadId, + session: { + threadId: sourceThreadId, + status: "ready", + providerName: "codex", + runtimeMode: "approval-required", + activeTurnId: null, + updatedAt: createdAt, + lastError: null, + }, + createdAt, + }), + ); + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.create", + commandId: CommandId.makeUnsafe("cmd-thread-create-plan-target"), + threadId: targetThreadId, + projectId: asProjectId("project-1"), + title: "Plan Target", + model: "gpt-5-codex", + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "approval-required", + branch: null, + worktreePath: null, + createdAt, + }), + ); + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.set", + commandId: CommandId.makeUnsafe("cmd-session-set-plan-target"), + threadId: targetThreadId, + session: { + threadId: targetThreadId, + status: "ready", + providerName: "codex", + runtimeMode: "approval-required", + activeTurnId: null, + updatedAt: createdAt, + lastError: null, + }, + createdAt, + }), + ); + + harness.emit({ + type: "turn.proposed.completed", + eventId: asEventId("evt-plan-source-completed"), + provider: "codex", + createdAt, + threadId: sourceThreadId, + turnId: sourceTurnId, + payload: { + planMarkdown: "# Source plan", + }, + }); + + const sourceThreadWithPlan = await waitForThread( + harness.engine, + (thread) => + thread.proposedPlans.some( + (proposedPlan: ProviderRuntimeTestProposedPlan) => + proposedPlan.id === "plan:thread-plan:turn:turn-plan-source" && + proposedPlan.implementedAt === null, + ), + 2_000, + sourceThreadId, + ); + const sourcePlan = sourceThreadWithPlan.proposedPlans.find( + (entry: ProviderRuntimeTestProposedPlan) => + entry.id === "plan:thread-plan:turn:turn-plan-source", + ); + expect(sourcePlan).toBeDefined(); + if (!sourcePlan) { + throw new Error("Expected source plan to exist."); + } + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.turn.start", + commandId: CommandId.makeUnsafe("cmd-turn-start-plan-target"), + threadId: targetThreadId, + message: { + messageId: asMessageId("msg-plan-target"), + role: "user", + text: "PLEASE IMPLEMENT THIS PLAN:\n# Source plan", + attachments: [], + }, + sourceProposedPlan: { + threadId: sourceThreadId, + planId: sourcePlan.id, + }, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "approval-required", + createdAt: new Date().toISOString(), + }), + ); + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.turn.start", + commandId: CommandId.makeUnsafe("cmd-turn-start-plan-target-overlap"), + threadId: targetThreadId, + message: { + messageId: asMessageId("msg-plan-target-overlap"), + role: "user", + text: "follow-up before the first turn starts", + attachments: [], + }, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "approval-required", + createdAt: new Date().toISOString(), + }), + ); + + const sourceThreadBeforeStart = await waitForThread( + harness.engine, + (thread) => + thread.proposedPlans.some( + (proposedPlan: ProviderRuntimeTestProposedPlan) => + proposedPlan.id === sourcePlan.id && proposedPlan.implementedAt === null, + ), + 2_000, + sourceThreadId, + ); + expect( + sourceThreadBeforeStart.proposedPlans.find((entry) => entry.id === sourcePlan.id), + ).toMatchObject({ + implementedAt: null, + implementationThreadId: null, + }); + + harness.emit({ + type: "turn.started", + eventId: asEventId("evt-plan-target-started"), + provider: "codex", + createdAt: new Date().toISOString(), + threadId: targetThreadId, + turnId: targetTurnId, + }); + + const sourceThreadAfterStart = await waitForThread( + harness.engine, + (thread) => + thread.proposedPlans.some( + (proposedPlan: ProviderRuntimeTestProposedPlan) => + proposedPlan.id === sourcePlan.id && + proposedPlan.implementedAt !== null && + proposedPlan.implementationThreadId === targetThreadId, + ), + 2_000, + sourceThreadId, + ); + expect( + sourceThreadAfterStart.proposedPlans.find((entry) => entry.id === sourcePlan.id), + ).toMatchObject({ + implementationThreadId: "thread-implement", + }); + }); + + it("does not mark the source proposed plan implemented for a rejected turn.started event", async () => { + const harness = await createHarness(); + const sourceThreadId = asThreadId("thread-plan"); + const targetThreadId = asThreadId("thread-1"); + const sourceTurnId = asTurnId("turn-plan-source"); + const activeTurnId = asTurnId("turn-already-running"); + const staleTurnId = asTurnId("turn-stale-start"); + const createdAt = new Date().toISOString(); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.create", + commandId: CommandId.makeUnsafe("cmd-thread-create-plan-source-guarded"), + threadId: sourceThreadId, + projectId: asProjectId("project-1"), + title: "Plan Source", + model: "gpt-5-codex", + interactionMode: "plan", + runtimeMode: "approval-required", + branch: null, + worktreePath: null, + createdAt, + }), + ); + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.set", + commandId: CommandId.makeUnsafe("cmd-session-set-plan-source-guarded"), + threadId: sourceThreadId, + session: { + threadId: sourceThreadId, + status: "ready", + providerName: "codex", + runtimeMode: "approval-required", + activeTurnId: null, + updatedAt: createdAt, + lastError: null, + }, + createdAt, + }), + ); + + harness.emit({ + type: "turn.started", + eventId: asEventId("evt-turn-started-already-running"), + provider: "codex", + createdAt, + threadId: targetThreadId, + turnId: activeTurnId, + }); + + await waitForThread( + harness.engine, + (thread) => + thread.session?.status === "running" && thread.session?.activeTurnId === activeTurnId, + 2_000, + targetThreadId, + ); + + harness.emit({ + type: "turn.proposed.completed", + eventId: asEventId("evt-plan-source-completed-guarded"), + provider: "codex", + createdAt, + threadId: sourceThreadId, + turnId: sourceTurnId, + payload: { + planMarkdown: "# Source plan", + }, + }); + + const sourceThreadWithPlan = await waitForThread( + harness.engine, + (thread) => + thread.proposedPlans.some( + (proposedPlan: ProviderRuntimeTestProposedPlan) => + proposedPlan.id === "plan:thread-plan:turn:turn-plan-source" && + proposedPlan.implementedAt === null, + ), + 2_000, + sourceThreadId, + ); + const sourcePlan = sourceThreadWithPlan.proposedPlans.find( + (entry: ProviderRuntimeTestProposedPlan) => + entry.id === "plan:thread-plan:turn:turn-plan-source", + ); + expect(sourcePlan).toBeDefined(); + if (!sourcePlan) { + throw new Error("Expected source plan to exist."); + } + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.turn.start", + commandId: CommandId.makeUnsafe("cmd-turn-start-plan-target-guarded"), + threadId: targetThreadId, + message: { + messageId: asMessageId("msg-plan-target-guarded"), + role: "user", + text: "PLEASE IMPLEMENT THIS PLAN:\n# Source plan", + attachments: [], + }, + sourceProposedPlan: { + threadId: sourceThreadId, + planId: sourcePlan.id, + }, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "approval-required", + createdAt: new Date().toISOString(), + }), + ); + + harness.emit({ + type: "turn.started", + eventId: asEventId("evt-turn-started-stale-plan-implementation"), + provider: "codex", + createdAt: new Date().toISOString(), + threadId: targetThreadId, + turnId: staleTurnId, + }); + + await harness.drain(); + + const readModel = await Effect.runPromise(harness.engine.getReadModel()); + const sourceThreadAfterRejectedStart = readModel.threads.find( + (entry) => entry.id === sourceThreadId, + ); + expect( + sourceThreadAfterRejectedStart?.proposedPlans.find((entry) => entry.id === sourcePlan.id), + ).toMatchObject({ + implementedAt: null, + implementationThreadId: null, + }); + + const targetThreadAfterRejectedStart = readModel.threads.find( + (entry) => entry.id === targetThreadId, + ); + expect(targetThreadAfterRejectedStart?.session?.status).toBe("running"); + expect(targetThreadAfterRejectedStart?.session?.activeTurnId).toBe(activeTurnId); + }); + it("finalizes buffered proposed-plan deltas into a first-class proposed plan on turn completion", async () => { const harness = await createHarness(); const now = new Date().toISOString(); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 417e93c8d..8dacb93cf 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -4,6 +4,7 @@ import { CommandId, MessageId, type OrchestrationEvent, + type OrchestrationProposedPlanId, CheckpointRef, isToolLifecycleItemType, ThreadId, @@ -15,6 +16,8 @@ import { Cache, Cause, Duration, Effect, Layer, Option, Ref, Stream } from "effe import { makeDrainableWorker } from "@t3tools/shared/DrainableWorker"; import { ProviderService } from "../../provider/Services/ProviderService.ts"; +import { ProjectionTurnRepository } from "../../persistence/Services/ProjectionTurns.ts"; +import { ProjectionTurnRepositoryLive } from "../../persistence/Layers/ProjectionTurns.ts"; import { resolveThreadWorkspaceCwd } from "../../checkpointing/Utils.ts"; import { isGitRepository } from "../../git/isRepo.ts"; import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts"; @@ -480,6 +483,7 @@ function runtimeEventToActivities( const make = Effect.gen(function* () { const orchestrationEngine = yield* OrchestrationEngineService; const providerService = yield* ProviderService; + const projectionTurnRepository = yield* ProjectionTurnRepository; const assistantDeliveryModeRef = yield* Ref.make( DEFAULT_ASSISTANT_DELIVERY_MODE, @@ -671,6 +675,8 @@ const make = Effect.gen(function* () { threadProposedPlans: ReadonlyArray<{ id: string; createdAt: string; + implementedAt: string | null; + implementationThreadId: ThreadId | null; }>; planId: string; turnId?: TurnId; @@ -693,6 +699,8 @@ const make = Effect.gen(function* () { id: input.planId, turnId: input.turnId ?? null, planMarkdown, + implementedAt: existingPlan?.implementedAt ?? null, + implementationThreadId: existingPlan?.implementationThreadId ?? null, createdAt: existingPlan?.createdAt ?? input.createdAt, updatedAt: input.updatedAt, }, @@ -706,6 +714,8 @@ const make = Effect.gen(function* () { threadProposedPlans: ReadonlyArray<{ id: string; createdAt: string; + implementedAt: string | null; + implementationThreadId: ThreadId | null; }>; planId: string; turnId?: TurnId; @@ -772,6 +782,57 @@ const make = Effect.gen(function* () { ).pipe(Effect.asVoid); }); + const getSourceProposedPlanReferenceForPendingTurnStart = Effect.fnUntraced(function* ( + threadId: ThreadId, + ) { + const pendingTurnStart = yield* projectionTurnRepository.getPendingTurnStartByThreadId({ + threadId, + }); + if (Option.isNone(pendingTurnStart)) { + return null; + } + + const sourceThreadId = pendingTurnStart.value.sourceProposedPlanThreadId; + const sourcePlanId = pendingTurnStart.value.sourceProposedPlanId; + if (sourceThreadId === null || sourcePlanId === null) { + return null; + } + + return { + sourceThreadId, + sourcePlanId, + } as const; + }); + + const markSourceProposedPlanImplemented = Effect.fnUntraced(function* ( + sourceThreadId: ThreadId, + sourcePlanId: OrchestrationProposedPlanId, + implementationThreadId: ThreadId, + implementedAt: string, + ) { + const readModel = yield* orchestrationEngine.getReadModel(); + const sourceThread = readModel.threads.find((entry) => entry.id === sourceThreadId); + const sourcePlan = sourceThread?.proposedPlans.find((entry) => entry.id === sourcePlanId); + if (!sourceThread || !sourcePlan || sourcePlan.implementedAt !== null) { + return; + } + + yield* orchestrationEngine.dispatch({ + type: "thread.proposed-plan.upsert", + commandId: CommandId.makeUnsafe( + `provider:source-proposed-plan-implemented:${implementationThreadId}:${crypto.randomUUID()}`, + ), + threadId: sourceThread.id, + proposedPlan: { + ...sourcePlan, + implementedAt, + implementationThreadId, + updatedAt: implementedAt, + }, + createdAt: implementedAt, + }); + }); + const processRuntimeEvent = (event: ProviderRuntimeEvent) => Effect.gen(function* () { const readModel = yield* orchestrationEngine.getReadModel(); @@ -812,6 +873,10 @@ const make = Effect.gen(function* () { return true; } })(); + const acceptedTurnStartedSourcePlan = + event.type === "turn.started" && shouldApplyThreadLifecycle + ? yield* getSourceProposedPlanReferenceForPendingTurnStart(thread.id) + : null; if ( event.type === "session.started" || @@ -854,6 +919,26 @@ const make = Effect.gen(function* () { : (thread.session?.lastError ?? null); if (shouldApplyThreadLifecycle) { + if (event.type === "turn.started" && acceptedTurnStartedSourcePlan !== null) { + yield* markSourceProposedPlanImplemented( + acceptedTurnStartedSourcePlan.sourceThreadId, + acceptedTurnStartedSourcePlan.sourcePlanId, + thread.id, + now, + ).pipe( + Effect.catchCause((cause) => + Effect.logWarning( + "provider runtime ingestion failed to mark source proposed plan", + { + eventId: event.eventId, + eventType: event.type, + cause: Cause.pretty(cause), + }, + ), + ), + ); + } + yield* orchestrationEngine.dispatch({ type: "thread.session.set", commandId: providerCommandId(event, "thread-session-set"), @@ -1144,4 +1229,7 @@ const make = Effect.gen(function* () { } satisfies ProviderRuntimeIngestionShape; }); -export const ProviderRuntimeIngestionLive = Layer.effect(ProviderRuntimeIngestionService, make); +export const ProviderRuntimeIngestionLive = Layer.effect( + ProviderRuntimeIngestionService, + make, +).pipe(Layer.provide(ProjectionTurnRepositoryLive)); diff --git a/apps/server/src/orchestration/decider.ts b/apps/server/src/orchestration/decider.ts index eea41a2b3..6ea4c5175 100644 --- a/apps/server/src/orchestration/decider.ts +++ b/apps/server/src/orchestration/decider.ts @@ -262,11 +262,35 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" } case "thread.turn.start": { - yield* requireThread({ + const targetThread = yield* requireThread({ readModel, command, threadId: command.threadId, }); + const sourceProposedPlan = command.sourceProposedPlan; + const sourceThread = sourceProposedPlan + ? yield* requireThread({ + readModel, + command, + threadId: sourceProposedPlan.threadId, + }) + : null; + const sourcePlan = + sourceProposedPlan && sourceThread + ? sourceThread.proposedPlans.find((entry) => entry.id === sourceProposedPlan.planId) + : null; + if (sourceProposedPlan && !sourcePlan) { + return yield* new OrchestrationCommandInvariantError({ + commandType: command.type, + detail: `Proposed plan '${sourceProposedPlan.planId}' does not exist on thread '${sourceProposedPlan.threadId}'.`, + }); + } + if (sourceThread && sourceThread.projectId !== targetThread.projectId) { + return yield* new OrchestrationCommandInvariantError({ + commandType: command.type, + detail: `Proposed plan '${sourceProposedPlan?.planId}' belongs to thread '${sourceThread.id}' in a different project.`, + }); + } const userMessageEvent: Omit = { ...withEventBase({ aggregateKind: "thread", @@ -306,12 +330,9 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" ? { providerOptions: command.providerOptions } : {}), assistantDeliveryMode: command.assistantDeliveryMode ?? DEFAULT_ASSISTANT_DELIVERY_MODE, - runtimeMode: - readModel.threads.find((entry) => entry.id === command.threadId)?.runtimeMode ?? - command.runtimeMode, - interactionMode: - readModel.threads.find((entry) => entry.id === command.threadId)?.interactionMode ?? - command.interactionMode, + runtimeMode: targetThread.runtimeMode, + interactionMode: targetThread.interactionMode, + ...(sourceProposedPlan !== undefined ? { sourceProposedPlan } : {}), createdAt: command.createdAt, }, }; diff --git a/apps/server/src/persistence/Layers/ProjectionThreadProposedPlans.ts b/apps/server/src/persistence/Layers/ProjectionThreadProposedPlans.ts index 3d103592f..ccd322feb 100644 --- a/apps/server/src/persistence/Layers/ProjectionThreadProposedPlans.ts +++ b/apps/server/src/persistence/Layers/ProjectionThreadProposedPlans.ts @@ -22,6 +22,8 @@ const makeProjectionThreadProposedPlanRepository = Effect.gen(function* () { thread_id, turn_id, plan_markdown, + implemented_at, + implementation_thread_id, created_at, updated_at ) @@ -30,6 +32,8 @@ const makeProjectionThreadProposedPlanRepository = Effect.gen(function* () { ${row.threadId}, ${row.turnId}, ${row.planMarkdown}, + ${row.implementedAt}, + ${row.implementationThreadId}, ${row.createdAt}, ${row.updatedAt} ) @@ -38,6 +42,8 @@ const makeProjectionThreadProposedPlanRepository = Effect.gen(function* () { thread_id = excluded.thread_id, turn_id = excluded.turn_id, plan_markdown = excluded.plan_markdown, + implemented_at = excluded.implemented_at, + implementation_thread_id = excluded.implementation_thread_id, created_at = excluded.created_at, updated_at = excluded.updated_at `, @@ -52,6 +58,8 @@ const makeProjectionThreadProposedPlanRepository = Effect.gen(function* () { thread_id AS "threadId", turn_id AS "turnId", plan_markdown AS "planMarkdown", + implemented_at AS "implementedAt", + implementation_thread_id AS "implementationThreadId", created_at AS "createdAt", updated_at AS "updatedAt" FROM projection_thread_proposed_plans diff --git a/apps/server/src/persistence/Layers/ProjectionTurns.ts b/apps/server/src/persistence/Layers/ProjectionTurns.ts index 8330661e3..041cf783d 100644 --- a/apps/server/src/persistence/Layers/ProjectionTurns.ts +++ b/apps/server/src/persistence/Layers/ProjectionTurns.ts @@ -91,10 +91,16 @@ const makeProjectionTurnRepository = Effect.gen(function* () { execute: ({ threadId }) => sql` DELETE FROM projection_turns - WHERE thread_id = ${threadId} - AND turn_id IS NULL - AND state = 'pending' - AND checkpoint_turn_count IS NULL + WHERE row_id IN ( + SELECT row_id + FROM projection_turns + WHERE thread_id = ${threadId} + AND turn_id IS NULL + AND state = 'pending' + AND checkpoint_turn_count IS NULL + ORDER BY requested_at ASC, row_id ASC + LIMIT 1 + ) `, }); @@ -106,6 +112,8 @@ const makeProjectionTurnRepository = Effect.gen(function* () { thread_id, turn_id, pending_message_id, + source_proposed_plan_thread_id, + source_proposed_plan_id, assistant_message_id, state, requested_at, @@ -120,6 +128,8 @@ const makeProjectionTurnRepository = Effect.gen(function* () { ${row.threadId}, NULL, ${row.messageId}, + ${row.sourceProposedPlanThreadId}, + ${row.sourceProposedPlanId}, NULL, 'pending', ${row.requestedAt}, @@ -141,6 +151,8 @@ const makeProjectionTurnRepository = Effect.gen(function* () { SELECT thread_id AS "threadId", pending_message_id AS "messageId", + source_proposed_plan_thread_id AS "sourceProposedPlanThreadId", + source_proposed_plan_id AS "sourceProposedPlanId", requested_at AS "requestedAt" FROM projection_turns WHERE thread_id = ${threadId} @@ -148,7 +160,7 @@ const makeProjectionTurnRepository = Effect.gen(function* () { AND state = 'pending' AND pending_message_id IS NOT NULL AND checkpoint_turn_count IS NULL - ORDER BY requested_at DESC + ORDER BY requested_at ASC, row_id ASC LIMIT 1 `, }); @@ -245,20 +257,14 @@ const makeProjectionTurnRepository = Effect.gen(function* () { ); const replacePendingTurnStart: ProjectionTurnRepositoryShape["replacePendingTurnStart"] = (row) => - sql - .withTransaction( - clearPendingProjectionTurnsByThread({ threadId: row.threadId }).pipe( - Effect.flatMap(() => insertPendingProjectionTurn(row)), - ), - ) - .pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "ProjectionTurnRepository.replacePendingTurnStart:query", - "ProjectionTurnRepository.replacePendingTurnStart:encodeRequest", - ), + insertPendingProjectionTurn(row).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionTurnRepository.replacePendingTurnStart:query", + "ProjectionTurnRepository.replacePendingTurnStart:encodeRequest", ), - ); + ), + ); const getPendingTurnStartByThreadId: ProjectionTurnRepositoryShape["getPendingTurnStartByThreadId"] = (input) => diff --git a/apps/server/src/persistence/Migrations.ts b/apps/server/src/persistence/Migrations.ts index 7deb890dd..ea1821014 100644 --- a/apps/server/src/persistence/Migrations.ts +++ b/apps/server/src/persistence/Migrations.ts @@ -25,6 +25,8 @@ import Migration0010 from "./Migrations/010_ProjectionThreadsRuntimeMode.ts"; import Migration0011 from "./Migrations/011_OrchestrationThreadCreatedRuntimeMode.ts"; import Migration0012 from "./Migrations/012_ProjectionThreadsInteractionMode.ts"; import Migration0013 from "./Migrations/013_ProjectionThreadProposedPlans.ts"; +import Migration0014 from "./Migrations/014_ProjectionThreadProposedPlanImplementation.ts"; +import Migration0015 from "./Migrations/015_ProjectionTurnsSourceProposedPlan.ts"; import { Effect } from "effect"; /** @@ -51,6 +53,8 @@ const loader = Migrator.fromRecord({ "11_OrchestrationThreadCreatedRuntimeMode": Migration0011, "12_ProjectionThreadsInteractionMode": Migration0012, "13_ProjectionThreadProposedPlans": Migration0013, + "14_ProjectionThreadProposedPlanImplementation": Migration0014, + "15_ProjectionTurnsSourceProposedPlan": Migration0015, }); /** diff --git a/apps/server/src/persistence/Migrations/014_ProjectionThreadProposedPlanImplementation.ts b/apps/server/src/persistence/Migrations/014_ProjectionThreadProposedPlanImplementation.ts new file mode 100644 index 000000000..c7a82bfb3 --- /dev/null +++ b/apps/server/src/persistence/Migrations/014_ProjectionThreadProposedPlanImplementation.ts @@ -0,0 +1,16 @@ +import * as Effect from "effect/Effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; + +export default Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + yield* sql` + ALTER TABLE projection_thread_proposed_plans + ADD COLUMN implemented_at TEXT + `; + + yield* sql` + ALTER TABLE projection_thread_proposed_plans + ADD COLUMN implementation_thread_id TEXT + `; +}); diff --git a/apps/server/src/persistence/Migrations/015_ProjectionTurnsSourceProposedPlan.ts b/apps/server/src/persistence/Migrations/015_ProjectionTurnsSourceProposedPlan.ts new file mode 100644 index 000000000..57a266187 --- /dev/null +++ b/apps/server/src/persistence/Migrations/015_ProjectionTurnsSourceProposedPlan.ts @@ -0,0 +1,16 @@ +import * as Effect from "effect/Effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; + +export default Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + yield* sql` + ALTER TABLE projection_turns + ADD COLUMN source_proposed_plan_thread_id TEXT + `; + + yield* sql` + ALTER TABLE projection_turns + ADD COLUMN source_proposed_plan_id TEXT + `; +}); diff --git a/apps/server/src/persistence/Services/ProjectionThreadProposedPlans.ts b/apps/server/src/persistence/Services/ProjectionThreadProposedPlans.ts index ee662d52b..d141a11bb 100644 --- a/apps/server/src/persistence/Services/ProjectionThreadProposedPlans.ts +++ b/apps/server/src/persistence/Services/ProjectionThreadProposedPlans.ts @@ -15,6 +15,8 @@ export const ProjectionThreadProposedPlan = Schema.Struct({ threadId: ThreadId, turnId: Schema.NullOr(TurnId), planMarkdown: TrimmedNonEmptyString, + implementedAt: Schema.NullOr(IsoDateTime), + implementationThreadId: Schema.NullOr(ThreadId), createdAt: IsoDateTime, updatedAt: IsoDateTime, }); diff --git a/apps/server/src/persistence/Services/ProjectionTurns.ts b/apps/server/src/persistence/Services/ProjectionTurns.ts index 1c791342f..f90c6d6a3 100644 --- a/apps/server/src/persistence/Services/ProjectionTurns.ts +++ b/apps/server/src/persistence/Services/ProjectionTurns.ts @@ -11,6 +11,7 @@ import { IsoDateTime, MessageId, NonNegativeInt, + OrchestrationProposedPlanId, OrchestrationCheckpointFile, OrchestrationCheckpointStatus, ThreadId, @@ -65,6 +66,8 @@ export type ProjectionTurnById = typeof ProjectionTurnById.Type; export const ProjectionPendingTurnStart = Schema.Struct({ threadId: ThreadId, messageId: MessageId, + sourceProposedPlanThreadId: Schema.NullOr(ThreadId), + sourceProposedPlanId: Schema.NullOr(OrchestrationProposedPlanId), requestedAt: IsoDateTime, }); export type ProjectionPendingTurnStart = typeof ProjectionPendingTurnStart.Type; @@ -106,21 +109,21 @@ export interface ProjectionTurnRepositoryShape { ) => Effect.Effect; /** - * Replaces any existing pending-start placeholder rows for a thread with exactly one latest pending-start row. + * Stores a pending-start placeholder row for a thread. */ readonly replacePendingTurnStart: ( row: ProjectionPendingTurnStart, ) => Effect.Effect; /** - * Returns the newest pending-start placeholder for a thread; this is expected to be at most one row after replacement writes. + * Returns the oldest pending-start placeholder for a thread. */ readonly getPendingTurnStartByThreadId: ( input: GetProjectionPendingTurnStartInput, ) => Effect.Effect, ProjectionRepositoryError>; /** - * Deletes only pending-start placeholder rows (`turnId = null`) for a thread and leaves concrete turn rows untouched. + * Deletes the oldest pending-start placeholder row for a thread and leaves concrete turn rows untouched. */ readonly deletePendingTurnStartByThreadId: ( input: GetProjectionPendingTurnStartInput, diff --git a/apps/web/src/components/ChatView.browser.tsx b/apps/web/src/components/ChatView.browser.tsx index faecc7f51..e576e18e3 100644 --- a/apps/web/src/components/ChatView.browser.tsx +++ b/apps/web/src/components/ChatView.browser.tsx @@ -345,6 +345,8 @@ function createSnapshotWithLongProposedPlan(): OrchestrationReadModel { id: "plan-browser-test", turnId: null, planMarkdown, + implementedAt: null, + implementationThreadId: null, createdAt: isoAt(1_000), updatedAt: isoAt(1_001), }, diff --git a/apps/web/src/components/ChatView.tsx b/apps/web/src/components/ChatView.tsx index 52637695e..2d646a4cf 100644 --- a/apps/web/src/components/ChatView.tsx +++ b/apps/web/src/components/ChatView.tsx @@ -55,6 +55,7 @@ import { deriveActivePlanState, findLatestProposedPlan, deriveWorkLogEntries, + hasActionableProposedPlan, hasToolActivityForTurn, isLatestTurnSettled, formatElapsed, @@ -643,7 +644,7 @@ export default function ChatView({ threadId }: ChatViewProps) { pendingUserInputs.length === 0 && interactionMode === "plan" && latestTurnSettled && - activeProposedPlan !== null; + hasActionableProposedPlan(activeProposedPlan); const activePendingApproval = pendingApprovals[0] ?? null; const isComposerApprovalState = activePendingApproval !== null; const hasComposerHeader = @@ -2818,6 +2819,10 @@ export default function ChatView({ threadId }: ChatViewProps) { assistantDeliveryMode: settings.enableAssistantStreaming ? "streaming" : "buffered", runtimeMode, interactionMode: "default", + sourceProposedPlan: { + threadId: activeThread.id, + planId: activeProposedPlan.id, + }, createdAt, }); }) diff --git a/apps/web/src/components/Sidebar.logic.test.ts b/apps/web/src/components/Sidebar.logic.test.ts index a8f84d564..8c3b47010 100644 --- a/apps/web/src/components/Sidebar.logic.test.ts +++ b/apps/web/src/components/Sidebar.logic.test.ts @@ -141,6 +141,8 @@ describe("resolveThreadStatusPill", () => { createdAt: "2026-03-09T10:00:00.000Z", updatedAt: "2026-03-09T10:05:00.000Z", planMarkdown: "# Plan", + implementedAt: null, + implementationThreadId: null, }, ], session: { @@ -155,6 +157,35 @@ describe("resolveThreadStatusPill", () => { ).toMatchObject({ label: "Plan Ready", pulse: false }); }); + it("does not show plan ready after the proposed plan was implemented elsewhere", () => { + expect( + resolveThreadStatusPill({ + thread: { + ...baseThread, + latestTurn: makeLatestTurn(), + proposedPlans: [ + { + id: "plan-1" as never, + turnId: "turn-1" as never, + createdAt: "2026-03-09T10:00:00.000Z", + updatedAt: "2026-03-09T10:05:00.000Z", + planMarkdown: "# Plan", + implementedAt: "2026-03-09T10:06:00.000Z", + implementationThreadId: "thread-implement" as never, + }, + ], + session: { + ...baseThread.session, + status: "ready", + orchestrationStatus: "ready", + }, + }, + hasPendingApprovals: false, + hasPendingUserInput: false, + }), + ).toMatchObject({ label: "Completed", pulse: false }); + }); + it("shows completed when there is an unseen completion and no active blocker", () => { expect( resolveThreadStatusPill({ diff --git a/apps/web/src/components/Sidebar.logic.ts b/apps/web/src/components/Sidebar.logic.ts index f08ed212a..5c4a4cc95 100644 --- a/apps/web/src/components/Sidebar.logic.ts +++ b/apps/web/src/components/Sidebar.logic.ts @@ -1,6 +1,10 @@ import type { Thread } from "../types"; import { cn } from "../lib/utils"; -import { findLatestProposedPlan, isLatestTurnSettled } from "../session-logic"; +import { + findLatestProposedPlan, + hasActionableProposedPlan, + isLatestTurnSettled, +} from "../session-logic"; export const THREAD_SELECTION_SAFE_SELECTOR = "[data-thread-item], [data-thread-selection-safe]"; export type SidebarNewThreadEnvMode = "local" | "worktree"; @@ -124,7 +128,9 @@ export function resolveThreadStatusPill(input: { !hasPendingUserInput && thread.interactionMode === "plan" && isLatestTurnSettled(thread.latestTurn, thread.session) && - findLatestProposedPlan(thread.proposedPlans, thread.latestTurn?.turnId ?? null) !== null; + hasActionableProposedPlan( + findLatestProposedPlan(thread.proposedPlans, thread.latestTurn?.turnId ?? null), + ); if (hasPlanReadyPrompt) { return { label: "Plan Ready", diff --git a/apps/web/src/session-logic.test.ts b/apps/web/src/session-logic.test.ts index 74ba3a814..36c5b8619 100644 --- a/apps/web/src/session-logic.test.ts +++ b/apps/web/src/session-logic.test.ts @@ -1,4 +1,10 @@ -import { EventId, MessageId, TurnId, type OrchestrationThreadActivity } from "@t3tools/contracts"; +import { + EventId, + MessageId, + ThreadId, + TurnId, + type OrchestrationThreadActivity, +} from "@t3tools/contracts"; import { describe, expect, it } from "vitest"; import { @@ -10,6 +16,7 @@ import { deriveTimelineEntries, deriveWorkLogEntries, findLatestProposedPlan, + hasActionableProposedPlan, hasToolActivityForTurn, isLatestTurnSettled, } from "./session-logic"; @@ -269,6 +276,8 @@ describe("findLatestProposedPlan", () => { id: "plan:thread-1:turn:turn-1", turnId: TurnId.makeUnsafe("turn-1"), planMarkdown: "# Older", + implementedAt: null, + implementationThreadId: null, createdAt: "2026-02-23T00:00:01.000Z", updatedAt: "2026-02-23T00:00:01.000Z", }, @@ -276,6 +285,8 @@ describe("findLatestProposedPlan", () => { id: "plan:thread-1:turn:turn-1", turnId: TurnId.makeUnsafe("turn-1"), planMarkdown: "# Latest", + implementedAt: null, + implementationThreadId: null, createdAt: "2026-02-23T00:00:01.000Z", updatedAt: "2026-02-23T00:00:02.000Z", }, @@ -283,6 +294,8 @@ describe("findLatestProposedPlan", () => { id: "plan:thread-1:turn:turn-2", turnId: TurnId.makeUnsafe("turn-2"), planMarkdown: "# Different turn", + implementedAt: null, + implementationThreadId: null, createdAt: "2026-02-23T00:00:03.000Z", updatedAt: "2026-02-23T00:00:03.000Z", }, @@ -293,6 +306,8 @@ describe("findLatestProposedPlan", () => { id: "plan:thread-1:turn:turn-1", turnId: "turn-1", planMarkdown: "# Latest", + implementedAt: null, + implementationThreadId: null, createdAt: "2026-02-23T00:00:01.000Z", updatedAt: "2026-02-23T00:00:02.000Z", }); @@ -305,6 +320,8 @@ describe("findLatestProposedPlan", () => { id: "plan:thread-1:turn:turn-1", turnId: TurnId.makeUnsafe("turn-1"), planMarkdown: "# First", + implementedAt: null, + implementationThreadId: null, createdAt: "2026-02-23T00:00:01.000Z", updatedAt: "2026-02-23T00:00:01.000Z", }, @@ -312,6 +329,8 @@ describe("findLatestProposedPlan", () => { id: "plan:thread-1:turn:turn-2", turnId: TurnId.makeUnsafe("turn-2"), planMarkdown: "# Latest", + implementedAt: null, + implementationThreadId: null, createdAt: "2026-02-23T00:00:02.000Z", updatedAt: "2026-02-23T00:00:03.000Z", }, @@ -323,6 +342,36 @@ describe("findLatestProposedPlan", () => { }); }); +describe("hasActionableProposedPlan", () => { + it("returns true for an unimplemented proposed plan", () => { + expect( + hasActionableProposedPlan({ + id: "plan-1", + turnId: TurnId.makeUnsafe("turn-1"), + planMarkdown: "# Plan", + implementedAt: null, + implementationThreadId: null, + createdAt: "2026-02-23T00:00:00.000Z", + updatedAt: "2026-02-23T00:00:01.000Z", + }), + ).toBe(true); + }); + + it("returns false for a proposed plan already implemented elsewhere", () => { + expect( + hasActionableProposedPlan({ + id: "plan-1", + turnId: TurnId.makeUnsafe("turn-1"), + planMarkdown: "# Plan", + implementedAt: "2026-02-23T00:00:02.000Z", + implementationThreadId: ThreadId.makeUnsafe("thread-implement"), + createdAt: "2026-02-23T00:00:00.000Z", + updatedAt: "2026-02-23T00:00:02.000Z", + }), + ).toBe(false); + }); +}); + describe("deriveWorkLogEntries", () => { it("omits tool started entries and keeps completed entries", () => { const activities: OrchestrationThreadActivity[] = [ @@ -531,6 +580,8 @@ describe("deriveTimelineEntries", () => { id: "plan:thread-1:turn:turn-1", turnId: TurnId.makeUnsafe("turn-1"), planMarkdown: "# Ship it", + implementedAt: null, + implementationThreadId: null, createdAt: "2026-02-23T00:00:02.000Z", updatedAt: "2026-02-23T00:00:02.000Z", }, @@ -550,6 +601,8 @@ describe("deriveTimelineEntries", () => { kind: "proposed-plan", proposedPlan: { planMarkdown: "# Ship it", + implementedAt: null, + implementationThreadId: null, }, }); }); diff --git a/apps/web/src/session-logic.ts b/apps/web/src/session-logic.ts index e389f10e2..02042acd9 100644 --- a/apps/web/src/session-logic.ts +++ b/apps/web/src/session-logic.ts @@ -7,6 +7,7 @@ import { type ProviderKind, type ToolLifecycleItemType, type UserInputQuestion, + type ThreadId, type TurnId, } from "@t3tools/contracts"; @@ -72,6 +73,8 @@ export interface LatestProposedPlanState { updatedAt: string; turnId: TurnId | null; planMarkdown: string; + implementedAt: string | null; + implementationThreadId: ThreadId | null; } export type TimelineEntry = @@ -380,6 +383,8 @@ export function findLatestProposedPlan( updatedAt: matchingTurnPlan.updatedAt, turnId: matchingTurnPlan.turnId, planMarkdown: matchingTurnPlan.planMarkdown, + implementedAt: matchingTurnPlan.implementedAt, + implementationThreadId: matchingTurnPlan.implementationThreadId, }; } } @@ -400,9 +405,17 @@ export function findLatestProposedPlan( updatedAt: latestPlan.updatedAt, turnId: latestPlan.turnId, planMarkdown: latestPlan.planMarkdown, + implementedAt: latestPlan.implementedAt, + implementationThreadId: latestPlan.implementationThreadId, }; } +export function hasActionableProposedPlan( + proposedPlan: LatestProposedPlanState | Pick | null, +): boolean { + return proposedPlan !== null && proposedPlan.implementedAt === null; +} + export function deriveWorkLogEntries( activities: ReadonlyArray, latestTurnId: TurnId | undefined, diff --git a/apps/web/src/store.ts b/apps/web/src/store.ts index faebe4b0f..73f113313 100644 --- a/apps/web/src/store.ts +++ b/apps/web/src/store.ts @@ -304,6 +304,8 @@ export function syncServerReadModel(state: AppState, readModel: OrchestrationRea id: proposedPlan.id, turnId: proposedPlan.turnId, planMarkdown: proposedPlan.planMarkdown, + implementedAt: proposedPlan.implementedAt, + implementationThreadId: proposedPlan.implementationThreadId, createdAt: proposedPlan.createdAt, updatedAt: proposedPlan.updatedAt, })), diff --git a/apps/web/src/types.ts b/apps/web/src/types.ts index c071fb3f6..32a7fe02b 100644 --- a/apps/web/src/types.ts +++ b/apps/web/src/types.ts @@ -53,6 +53,8 @@ export interface ProposedPlan { id: OrchestrationProposedPlanId; turnId: TurnId | null; planMarkdown: string; + implementedAt: string | null; + implementationThreadId: ThreadId | null; createdAt: string; updatedAt: string; } diff --git a/packages/contracts/src/orchestration.test.ts b/packages/contracts/src/orchestration.test.ts index 25a641edb..136c5a849 100644 --- a/packages/contracts/src/orchestration.test.ts +++ b/packages/contracts/src/orchestration.test.ts @@ -6,6 +6,7 @@ import { DEFAULT_PROVIDER_INTERACTION_MODE, DEFAULT_RUNTIME_MODE, OrchestrationGetTurnDiffInput, + OrchestrationProposedPlan, OrchestrationSession, ProjectCreateCommand, ThreadTurnStartCommand, @@ -21,6 +22,7 @@ const decodeThreadTurnStartCommand = Schema.decodeUnknownEffect(ThreadTurnStartC const decodeThreadTurnStartRequestedPayload = Schema.decodeUnknownEffect( ThreadTurnStartRequestedPayload, ); +const decodeOrchestrationProposedPlan = Schema.decodeUnknownEffect(OrchestrationProposedPlan); const decodeOrchestrationSession = Schema.decodeUnknownEffect(OrchestrationSession); const decodeThreadCreatedPayload = Schema.decodeUnknownEffect(ThreadCreatedPayload); @@ -186,6 +188,31 @@ it.effect("accepts provider-scoped model options in thread.turn.start", () => }), ); +it.effect("accepts a source proposed plan reference in thread.turn.start", () => + Effect.gen(function* () { + const parsed = yield* decodeThreadTurnStartCommand({ + type: "thread.turn.start", + commandId: "cmd-turn-source-plan", + threadId: "thread-2", + message: { + messageId: "msg-source-plan", + role: "user", + text: "implement this", + attachments: [], + }, + sourceProposedPlan: { + threadId: "thread-1", + planId: "plan-1", + }, + createdAt: "2026-01-01T00:00:00.000Z", + }); + assert.deepStrictEqual(parsed.sourceProposedPlan, { + threadId: "thread-1", + planId: "plan-1", + }); + }), +); + it.effect( "decodes thread.turn-start-requested defaults for provider, runtime mode, and interaction mode", () => @@ -198,9 +225,28 @@ it.effect( assert.strictEqual(parsed.provider, undefined); assert.strictEqual(parsed.runtimeMode, DEFAULT_RUNTIME_MODE); assert.strictEqual(parsed.interactionMode, DEFAULT_PROVIDER_INTERACTION_MODE); + assert.strictEqual(parsed.sourceProposedPlan, undefined); }), ); +it.effect("decodes thread.turn-start-requested source proposed plan metadata when present", () => + Effect.gen(function* () { + const parsed = yield* decodeThreadTurnStartRequestedPayload({ + threadId: "thread-2", + messageId: "msg-2", + sourceProposedPlan: { + threadId: "thread-1", + planId: "plan-1", + }, + createdAt: "2026-01-01T00:00:00.000Z", + }); + assert.deepStrictEqual(parsed.sourceProposedPlan, { + threadId: "thread-1", + planId: "plan-1", + }); + }), +); + it.effect("decodes orchestration session runtime mode defaults", () => Effect.gen(function* () { const parsed = yield* decodeOrchestrationSession({ @@ -216,3 +262,33 @@ it.effect("decodes orchestration session runtime mode defaults", () => assert.strictEqual(parsed.runtimeMode, DEFAULT_RUNTIME_MODE); }), ); + +it.effect("defaults proposed plan implementation metadata for historical rows", () => + Effect.gen(function* () { + const parsed = yield* decodeOrchestrationProposedPlan({ + id: "plan-1", + turnId: "turn-1", + planMarkdown: "# Plan", + createdAt: "2026-01-01T00:00:00.000Z", + updatedAt: "2026-01-01T00:00:00.000Z", + }); + assert.strictEqual(parsed.implementedAt, null); + assert.strictEqual(parsed.implementationThreadId, null); + }), +); + +it.effect("preserves proposed plan implementation metadata when present", () => + Effect.gen(function* () { + const parsed = yield* decodeOrchestrationProposedPlan({ + id: "plan-2", + turnId: "turn-2", + planMarkdown: "# Plan", + implementedAt: "2026-01-02T00:00:00.000Z", + implementationThreadId: "thread-2", + createdAt: "2026-01-01T00:00:00.000Z", + updatedAt: "2026-01-02T00:00:00.000Z", + }); + assert.strictEqual(parsed.implementedAt, "2026-01-02T00:00:00.000Z"); + assert.strictEqual(parsed.implementationThreadId, "thread-2"); + }), +); diff --git a/packages/contracts/src/orchestration.ts b/packages/contracts/src/orchestration.ts index 17c5eb21d..4ee53c276 100644 --- a/packages/contracts/src/orchestration.ts +++ b/packages/contracts/src/orchestration.ts @@ -163,11 +163,18 @@ export const OrchestrationProposedPlan = Schema.Struct({ id: OrchestrationProposedPlanId, turnId: Schema.NullOr(TurnId), planMarkdown: TrimmedNonEmptyString, + implementedAt: Schema.NullOr(IsoDateTime).pipe(Schema.withDecodingDefault(() => null)), + implementationThreadId: Schema.NullOr(ThreadId).pipe(Schema.withDecodingDefault(() => null)), createdAt: IsoDateTime, updatedAt: IsoDateTime, }); export type OrchestrationProposedPlan = typeof OrchestrationProposedPlan.Type; +const SourceProposedPlanReference = Schema.Struct({ + threadId: ThreadId, + planId: OrchestrationProposedPlanId, +}); + export const OrchestrationSessionStatus = Schema.Literals([ "idle", "starting", @@ -374,6 +381,7 @@ export const ThreadTurnStartCommand = Schema.Struct({ interactionMode: ProviderInteractionMode.pipe( Schema.withDecodingDefault(() => DEFAULT_PROVIDER_INTERACTION_MODE), ), + sourceProposedPlan: Schema.optional(SourceProposedPlanReference), createdAt: IsoDateTime, }); @@ -394,6 +402,7 @@ const ClientThreadTurnStartCommand = Schema.Struct({ assistantDeliveryMode: Schema.optional(AssistantDeliveryMode), runtimeMode: RuntimeMode, interactionMode: ProviderInteractionMode, + sourceProposedPlan: Schema.optional(SourceProposedPlanReference), createdAt: IsoDateTime, }); @@ -676,6 +685,7 @@ export const ThreadTurnStartRequestedPayload = Schema.Struct({ interactionMode: ProviderInteractionMode.pipe( Schema.withDecodingDefault(() => DEFAULT_PROVIDER_INTERACTION_MODE), ), + sourceProposedPlan: Schema.optional(SourceProposedPlanReference), createdAt: IsoDateTime, });