Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/server/src/orchestration/Layers/ProjectionPipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,8 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
threadId: event.payload.threadId,
turnId: event.payload.proposedPlan.turnId,
planMarkdown: event.payload.proposedPlan.planMarkdown,
implementedAt: event.payload.proposedPlan.implementedAt,
implementationThreadId: event.payload.proposedPlan.implementationThreadId,
createdAt: event.payload.proposedPlan.createdAt,
updatedAt: event.payload.proposedPlan.updatedAt,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => {

yield* sql`DELETE FROM projection_projects`;
yield* sql`DELETE FROM projection_state`;
yield* sql`DELETE FROM projection_thread_proposed_plans`;
yield* sql`DELETE FROM projection_turns`;

yield* sql`
Expand Down Expand Up @@ -101,6 +102,29 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => {
)
`;

yield* sql`
INSERT INTO projection_thread_proposed_plans (
plan_id,
thread_id,
turn_id,
plan_markdown,
implemented_at,
implementation_thread_id,
created_at,
updated_at
)
VALUES (
'plan-1',
'thread-1',
'turn-1',
'# Ship it',
'2026-02-24T00:00:05.500Z',
'thread-2',
'2026-02-24T00:00:05.000Z',
'2026-02-24T00:00:05.500Z'
)
`;

yield* sql`
INSERT INTO projection_thread_activities (
activity_id,
Expand Down Expand Up @@ -253,7 +277,17 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => {
updatedAt: "2026-02-24T00:00:05.000Z",
},
],
proposedPlans: [],
proposedPlans: [
{
id: "plan-1",
turnId: asTurnId("turn-1"),
planMarkdown: "# Ship it",
implementedAt: "2026-02-24T00:00:05.500Z",
implementationThreadId: ThreadId.makeUnsafe("thread-2"),
createdAt: "2026-02-24T00:00:05.000Z",
updatedAt: "2026-02-24T00:00:05.500Z",
},
],
activities: [
{
id: asEventId("activity-1"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () {
thread_id AS "threadId",
turn_id AS "turnId",
plan_markdown AS "planMarkdown",
implemented_at AS "implementedAt",
implementation_thread_id AS "implementationThreadId",
created_at AS "createdAt",
updated_at AS "updatedAt"
FROM projection_thread_proposed_plans
Expand Down Expand Up @@ -435,6 +437,8 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () {
id: row.planId,
turnId: row.turnId,
planMarkdown: row.planMarkdown,
implementedAt: row.implementedAt,
implementationThreadId: row.implementationThreadId,
createdAt: row.createdAt,
updatedAt: row.updatedAt,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,8 @@ const make = Effect.gen(function* () {
threadProposedPlans: ReadonlyArray<{
id: string;
createdAt: string;
implementedAt: string | null;
implementationThreadId: ThreadId | null;
}>;
planId: string;
turnId?: TurnId;
Expand All @@ -693,6 +695,8 @@ const make = Effect.gen(function* () {
id: input.planId,
turnId: input.turnId ?? null,
planMarkdown,
implementedAt: existingPlan?.implementedAt ?? null,
implementationThreadId: existingPlan?.implementationThreadId ?? null,
createdAt: existingPlan?.createdAt ?? input.createdAt,
updatedAt: input.updatedAt,
},
Expand All @@ -706,6 +710,8 @@ const make = Effect.gen(function* () {
threadProposedPlans: ReadonlyArray<{
id: string;
createdAt: string;
implementedAt: string | null;
implementationThreadId: ThreadId | null;
}>;
planId: string;
turnId?: TurnId;
Expand Down
251 changes: 251 additions & 0 deletions apps/server/src/orchestration/decider.projectScripts.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
MessageId,
ProjectId,
ThreadId,
TurnId,
} from "@t3tools/contracts";
import { describe, expect, it } from "vitest";
import { Effect } from "effect";
Expand Down Expand Up @@ -201,6 +202,256 @@ describe("decider project scripts", () => {
});
});

it("marks the source proposed plan implemented when starting a new implementation thread", async () => {
const now = new Date().toISOString();
const initial = createEmptyReadModel(now);
const withProject = await Effect.runPromise(
projectEvent(initial, {
sequence: 1,
eventId: asEventId("evt-project-create"),
aggregateKind: "project",
aggregateId: asProjectId("project-1"),
type: "project.created",
occurredAt: now,
commandId: CommandId.makeUnsafe("cmd-project-create"),
causationEventId: null,
correlationId: CommandId.makeUnsafe("cmd-project-create"),
metadata: {},
payload: {
projectId: asProjectId("project-1"),
title: "Project",
workspaceRoot: "/tmp/project",
defaultModel: null,
scripts: [],
createdAt: now,
updatedAt: now,
},
}),
);
const withSourceThread = await Effect.runPromise(
projectEvent(withProject, {
sequence: 2,
eventId: asEventId("evt-thread-create-source"),
aggregateKind: "thread",
aggregateId: ThreadId.makeUnsafe("thread-plan"),
type: "thread.created",
occurredAt: now,
commandId: CommandId.makeUnsafe("cmd-thread-create-source"),
causationEventId: null,
correlationId: CommandId.makeUnsafe("cmd-thread-create-source"),
metadata: {},
payload: {
threadId: ThreadId.makeUnsafe("thread-plan"),
projectId: asProjectId("project-1"),
title: "Plan Thread",
model: "gpt-5-codex",
interactionMode: "plan",
runtimeMode: "approval-required",
branch: null,
worktreePath: null,
createdAt: now,
updatedAt: now,
},
}),
);
const withTargetThread = await Effect.runPromise(
projectEvent(withSourceThread, {
sequence: 3,
eventId: asEventId("evt-thread-create-target"),
aggregateKind: "thread",
aggregateId: ThreadId.makeUnsafe("thread-implement"),
type: "thread.created",
occurredAt: now,
commandId: CommandId.makeUnsafe("cmd-thread-create-target"),
causationEventId: null,
correlationId: CommandId.makeUnsafe("cmd-thread-create-target"),
metadata: {},
payload: {
threadId: ThreadId.makeUnsafe("thread-implement"),
projectId: asProjectId("project-1"),
title: "Implementation Thread",
model: "gpt-5-codex",
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
branch: null,
worktreePath: null,
createdAt: now,
updatedAt: now,
},
}),
);
const readModel = await Effect.runPromise(
projectEvent(withTargetThread, {
sequence: 4,
eventId: asEventId("evt-plan-upsert"),
aggregateKind: "thread",
aggregateId: ThreadId.makeUnsafe("thread-plan"),
type: "thread.proposed-plan-upserted",
occurredAt: now,
commandId: CommandId.makeUnsafe("cmd-plan-upsert"),
causationEventId: null,
correlationId: CommandId.makeUnsafe("cmd-plan-upsert"),
metadata: {},
payload: {
threadId: ThreadId.makeUnsafe("thread-plan"),
proposedPlan: {
id: "plan-1",
turnId: TurnId.makeUnsafe("turn-1"),
planMarkdown: "# Plan",
implementedAt: null,
implementationThreadId: null,
createdAt: now,
updatedAt: now,
},
},
}),
);

const result = await Effect.runPromise(
decideOrchestrationCommand({
command: {
type: "thread.turn.start",
commandId: CommandId.makeUnsafe("cmd-turn-start-source-plan"),
threadId: ThreadId.makeUnsafe("thread-implement"),
message: {
messageId: asMessageId("message-user-2"),
role: "user",
text: "PLEASE IMPLEMENT THIS PLAN:\n# Plan",
attachments: [],
},
sourceProposedPlan: {
threadId: ThreadId.makeUnsafe("thread-plan"),
planId: "plan-1",
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
},
readModel,
}),
);

expect(Array.isArray(result)).toBe(true);
const events = Array.isArray(result) ? result : [result];
expect(events).toHaveLength(3);
expect(events[2]?.type).toBe("thread.proposed-plan-upserted");
expect(events[2]?.aggregateId).toBe("thread-plan");
if (events[2]?.type !== "thread.proposed-plan-upserted") {
return;
}
expect(events[2].payload.proposedPlan).toMatchObject({
id: "plan-1",
implementedAt: now,
implementationThreadId: "thread-implement",
});
});

it("rejects thread.turn.start when the source proposed plan is missing", async () => {
const now = new Date().toISOString();
const initial = createEmptyReadModel(now);
const withProject = await Effect.runPromise(
projectEvent(initial, {
sequence: 1,
eventId: asEventId("evt-project-create"),
aggregateKind: "project",
aggregateId: asProjectId("project-1"),
type: "project.created",
occurredAt: now,
commandId: CommandId.makeUnsafe("cmd-project-create"),
causationEventId: null,
correlationId: CommandId.makeUnsafe("cmd-project-create"),
metadata: {},
payload: {
projectId: asProjectId("project-1"),
title: "Project",
workspaceRoot: "/tmp/project",
defaultModel: null,
scripts: [],
createdAt: now,
updatedAt: now,
},
}),
);
const withSourceThread = await Effect.runPromise(
projectEvent(withProject, {
sequence: 2,
eventId: asEventId("evt-thread-create-source"),
aggregateKind: "thread",
aggregateId: ThreadId.makeUnsafe("thread-plan"),
type: "thread.created",
occurredAt: now,
commandId: CommandId.makeUnsafe("cmd-thread-create-source"),
causationEventId: null,
correlationId: CommandId.makeUnsafe("cmd-thread-create-source"),
metadata: {},
payload: {
threadId: ThreadId.makeUnsafe("thread-plan"),
projectId: asProjectId("project-1"),
title: "Plan Thread",
model: "gpt-5-codex",
interactionMode: "plan",
runtimeMode: "approval-required",
branch: null,
worktreePath: null,
createdAt: now,
updatedAt: now,
},
}),
);
const readModel = await Effect.runPromise(
projectEvent(withSourceThread, {
sequence: 3,
eventId: asEventId("evt-thread-create-target"),
aggregateKind: "thread",
aggregateId: ThreadId.makeUnsafe("thread-implement"),
type: "thread.created",
occurredAt: now,
commandId: CommandId.makeUnsafe("cmd-thread-create-target"),
causationEventId: null,
correlationId: CommandId.makeUnsafe("cmd-thread-create-target"),
metadata: {},
payload: {
threadId: ThreadId.makeUnsafe("thread-implement"),
projectId: asProjectId("project-1"),
title: "Implementation Thread",
model: "gpt-5-codex",
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
branch: null,
worktreePath: null,
createdAt: now,
updatedAt: now,
},
}),
);

await expect(
Effect.runPromise(
decideOrchestrationCommand({
command: {
type: "thread.turn.start",
commandId: CommandId.makeUnsafe("cmd-turn-start-missing-source-plan"),
threadId: ThreadId.makeUnsafe("thread-implement"),
message: {
messageId: asMessageId("message-user-3"),
role: "user",
text: "PLEASE IMPLEMENT THIS PLAN:\n# Missing",
attachments: [],
},
sourceProposedPlan: {
threadId: ThreadId.makeUnsafe("thread-plan"),
planId: "plan-missing",
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
},
readModel,
}),
),
).rejects.toThrow("Proposed plan 'plan-missing' does not exist on thread 'thread-plan'.");
});

it("emits thread.runtime-mode-set from thread.runtime-mode.set", async () => {
const now = new Date().toISOString();
const initial = createEmptyReadModel(now);
Expand Down
Loading