diff --git a/server/src/__tests__/heartbeat-workspace-session.test.ts b/server/src/__tests__/heartbeat-workspace-session.test.ts index 7fab2b429d..c862f26499 100644 --- a/server/src/__tests__/heartbeat-workspace-session.test.ts +++ b/server/src/__tests__/heartbeat-workspace-session.test.ts @@ -7,6 +7,7 @@ import { formatRuntimeWorkspaceWarningLog, prioritizeProjectWorkspaceCandidatesForRun, parseSessionCompactionPolicy, + readRuntimeStateSessionSnapshot, resolveRuntimeSessionParamsForWorkspace, shouldResetTaskSessionForWake, type ResolvedWorkspaceForRun, @@ -120,6 +121,51 @@ describe("resolveRuntimeSessionParamsForWorkspace", () => { }); }); +describe("readRuntimeStateSessionSnapshot", () => { + it("restores session params and display id from runtime state json", () => { + const snapshot = readRuntimeStateSessionSnapshot( + { + sessionParams: { + sessionId: "session-1", + cwd: "/tmp/project", + workspaceId: "workspace-1", + }, + sessionDisplayId: "session-1", + }, + codexSessionCodec, + ); + + expect(snapshot).toEqual({ + params: { + sessionId: "session-1", + cwd: "/tmp/project", + workspaceId: "workspace-1", + }, + displayId: "session-1", + }); + }); + + it("falls back to deriving display id from session params", () => { + const snapshot = readRuntimeStateSessionSnapshot( + { + sessionParams: { + sessionId: "session-2", + cwd: "/tmp/project", + }, + }, + codexSessionCodec, + ); + + expect(snapshot).toEqual({ + params: { + sessionId: "session-2", + cwd: "/tmp/project", + }, + displayId: "session-2", + }); + }); +}); + describe("shouldResetTaskSessionForWake", () => { it("resets session context on assignment wake", () => { expect(shouldResetTaskSessionForWake({ wakeReason: "issue_assigned" })).toBe(true); diff --git a/server/src/services/heartbeat.ts b/server/src/services/heartbeat.ts index c909b9b77c..d0631c6570 100644 --- a/server/src/services/heartbeat.ts +++ b/server/src/services/heartbeat.ts @@ -708,6 +708,23 @@ function normalizeSessionParams(params: Record | null | undefin return Object.keys(params).length > 0 ? params : null; } +export function readRuntimeStateSessionSnapshot( + stateJson: Record | null | undefined, + codec: AdapterSessionCodec, +) { + const state = parseObject(stateJson); + const params = normalizeSessionParams(codec.deserialize(parseObject(state.sessionParams))); + const displayId = truncateDisplayId( + readNonEmptyString(state.sessionDisplayId) ?? + (codec.getDisplayId ? codec.getDisplayId(params) : null) ?? + readNonEmptyString(params?.sessionId), + ); + return { + params, + displayId, + }; +} + function resolveNextSessionState(input: { codec: AdapterSessionCodec; adapterResult: AdapterExecutionResult; @@ -848,6 +865,107 @@ export function heartbeatService(db: Db) { .then((rows) => rows[0] ?? null); } + async function resolveRuntimeStateSessionSnapshotForRun(input: { + agent: typeof agents.$inferSelect; + runtime: typeof agentRuntimeState.$inferSelect; + codec: AdapterSessionCodec; + }) { + const { agent, runtime, codec } = input; + const snapshot = readRuntimeStateSessionSnapshot(runtime.stateJson, codec); + const runtimeSessionId = + snapshot.displayId ?? + readNonEmptyString(snapshot.params?.sessionId) ?? + readNonEmptyString(runtime.sessionId); + if (!runtimeSessionId) return snapshot; + + const fallbackAgentHomeCwd = resolveDefaultAgentWorkspaceDir(agent.id); + const snapshotCwd = readNonEmptyString(snapshot.params?.cwd); + if (snapshot.params && snapshotCwd && path.resolve(snapshotCwd) !== path.resolve(fallbackAgentHomeCwd)) { + return snapshot; + } + + const latestTaskSession = await db + .select() + .from(agentTaskSessions) + .where( + and( + eq(agentTaskSessions.companyId, agent.companyId), + eq(agentTaskSessions.agentId, agent.id), + eq(agentTaskSessions.adapterType, agent.adapterType), + ), + ) + .orderBy(desc(agentTaskSessions.updatedAt), desc(agentTaskSessions.createdAt)) + .limit(1) + .then((rows) => rows[0] ?? null); + const latestTaskSessionParams = normalizeSessionParams( + codec.deserialize(latestTaskSession?.sessionParamsJson ?? null), + ); + const latestTaskSessionId = truncateDisplayId( + latestTaskSession?.sessionDisplayId ?? + (codec.getDisplayId ? codec.getDisplayId(latestTaskSessionParams) : null) ?? + readNonEmptyString(latestTaskSessionParams?.sessionId), + ); + const latestTaskSessionCwd = readNonEmptyString(latestTaskSessionParams?.cwd); + if ( + latestTaskSessionParams && + latestTaskSessionId === runtimeSessionId && + latestTaskSessionCwd && + path.resolve(latestTaskSessionCwd) !== path.resolve(fallbackAgentHomeCwd) + ) { + return { + params: latestTaskSessionParams, + displayId: latestTaskSessionId, + }; + } + + const latestRun = await getLatestRunForSession(agent.id, runtimeSessionId); + const latestContext = parseObject(latestRun?.contextSnapshot); + const workspace = parseObject(latestContext.paperclipWorkspace); + const cwd = readNonEmptyString(workspace.cwd); + + if ( + snapshot.params && + (!snapshotCwd || path.resolve(snapshotCwd) !== path.resolve(fallbackAgentHomeCwd) || !cwd) + ) { + return snapshot; + } + + if (snapshot.params && cwd && path.resolve(cwd) !== path.resolve(snapshotCwd ?? fallbackAgentHomeCwd)) { + return { + params: { + ...snapshot.params, + cwd, + ...(readNonEmptyString(workspace.workspaceId) ? { workspaceId: readNonEmptyString(workspace.workspaceId) } : {}), + ...(readNonEmptyString(workspace.repoUrl) ? { repoUrl: readNonEmptyString(workspace.repoUrl) } : {}), + ...(readNonEmptyString(workspace.repoRef) ? { repoRef: readNonEmptyString(workspace.repoRef) } : {}), + }, + displayId: snapshot.displayId ?? runtimeSessionId, + }; + } + + if (!cwd) { + return { + params: { sessionId: runtimeSessionId }, + displayId: snapshot.displayId ?? runtimeSessionId, + }; + } + + const params: Record = { + sessionId: runtimeSessionId, + cwd, + }; + const workspaceId = readNonEmptyString(workspace.workspaceId); + const repoUrl = readNonEmptyString(workspace.repoUrl); + const repoRef = readNonEmptyString(workspace.repoRef); + if (workspaceId) params.workspaceId = workspaceId; + if (repoUrl) params.repoUrl = repoUrl; + if (repoRef) params.repoRef = repoRef; + return { + params, + displayId: snapshot.displayId ?? runtimeSessionId, + }; + } + async function getOldestRunForSession(agentId: string, sessionId: string) { return db .select({ @@ -1020,7 +1138,14 @@ export function heartbeatService(db: Db) { } const runtimeForRun = await getRuntimeState(agent.id); - return runtimeForRun?.sessionId ?? null; + if (!runtimeForRun) return null; + const codec = getAdapterSessionCodec(agent.adapterType); + const runtimeSnapshot = await resolveRuntimeStateSessionSnapshotForRun({ + agent, + runtime: runtimeForRun, + codec, + }); + return runtimeSnapshot.displayId ?? runtimeForRun.sessionId ?? null; } async function resolveExplicitResumeSessionOverride( @@ -1845,7 +1970,11 @@ export function heartbeatService(db: Db) { agent: typeof agents.$inferSelect, run: typeof heartbeatRuns.$inferSelect, result: AdapterExecutionResult, - session: { legacySessionId: string | null }, + session: { + legacySessionId: string | null; + params: Record | null; + displayId: string | null; + }, normalizedUsage?: UsageTotals | null, ) { await ensureRuntimeState(agent); @@ -1865,6 +1994,10 @@ export function heartbeatService(db: Db) { .set({ adapterType: agent.adapterType, sessionId: session.legacySessionId, + stateJson: { + sessionParams: session.params, + sessionDisplayId: session.displayId, + }, lastRunId: run.id, lastRunStatus: run.status, lastError: result.errorMessage ?? null, @@ -2028,7 +2161,14 @@ export function heartbeatService(db: Db) { const previousSessionParams = explicitResumeSessionParams ?? (explicitResumeSessionDisplayId ? { sessionId: explicitResumeSessionDisplayId } : null) ?? - normalizeSessionParams(sessionCodec.deserialize(taskSessionForRun?.sessionParamsJson ?? null)); + normalizeSessionParams(sessionCodec.deserialize(taskSessionForRun?.sessionParamsJson ?? null)) ?? + (!resetTaskSession + ? (await resolveRuntimeStateSessionSnapshotForRun({ + agent, + runtime, + codec: sessionCodec, + })).params + : null); const config = parseObject(agent.adapterConfig); const executionWorkspaceMode = resolveExecutionWorkspaceMode({ projectPolicy: projectExecutionWorkspacePolicy, @@ -2693,6 +2833,8 @@ export function heartbeatService(db: Db) { if (finalizedRun) { await updateRuntimeState(agent, finalizedRun, adapterResult, { legacySessionId: nextSessionState.legacySessionId, + params: nextSessionState.params, + displayId: nextSessionState.displayId, }, normalizedUsage); if (taskKey) { if (adapterResult.clearSession || (!nextSessionState.params && !nextSessionState.displayId)) { @@ -2762,6 +2904,8 @@ export function heartbeatService(db: Db) { errorMessage: message, }, { legacySessionId: runtimeForAdapter.sessionId, + params: runtimeForAdapter.sessionParams, + displayId: runtimeForAdapter.sessionDisplayId, }); if (taskKey && (previousSessionParams || previousSessionDisplayId || taskSession)) {