Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions server/src/__tests__/heartbeat-workspace-session.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
formatRuntimeWorkspaceWarningLog,
prioritizeProjectWorkspaceCandidatesForRun,
parseSessionCompactionPolicy,
readRuntimeStateSessionSnapshot,
resolveRuntimeSessionParamsForWorkspace,
shouldResetTaskSessionForWake,
type ResolvedWorkspaceForRun,
Expand Down Expand Up @@ -120,6 +121,51 @@ describe("resolveRuntimeSessionParamsForWorkspace", () => {
});
});

describe("readRuntimeStateSessionSnapshot", () => {
it("restores session params and display id from runtime state json", () => {
const snapshot = readRuntimeStateSessionSnapshot(
{
sessionParams: {
sessionId: "session-1",
cwd: "/tmp/project",
workspaceId: "workspace-1",
},
sessionDisplayId: "session-1",
},
codexSessionCodec,
);

expect(snapshot).toEqual({
params: {
sessionId: "session-1",
cwd: "/tmp/project",
workspaceId: "workspace-1",
},
displayId: "session-1",
});
});

it("falls back to deriving display id from session params", () => {
const snapshot = readRuntimeStateSessionSnapshot(
{
sessionParams: {
sessionId: "session-2",
cwd: "/tmp/project",
},
},
codexSessionCodec,
);

expect(snapshot).toEqual({
params: {
sessionId: "session-2",
cwd: "/tmp/project",
},
displayId: "session-2",
});
});
});

describe("shouldResetTaskSessionForWake", () => {
it("resets session context on assignment wake", () => {
expect(shouldResetTaskSessionForWake({ wakeReason: "issue_assigned" })).toBe(true);
Expand Down
150 changes: 147 additions & 3 deletions server/src/services/heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,23 @@ function normalizeSessionParams(params: Record<string, unknown> | null | undefin
return Object.keys(params).length > 0 ? params : null;
}

export function readRuntimeStateSessionSnapshot(
stateJson: Record<string, unknown> | null | undefined,
codec: AdapterSessionCodec,
) {
const state = parseObject(stateJson);
const params = normalizeSessionParams(codec.deserialize(parseObject(state.sessionParams)));
const displayId = truncateDisplayId(
readNonEmptyString(state.sessionDisplayId) ??
(codec.getDisplayId ? codec.getDisplayId(params) : null) ??
readNonEmptyString(params?.sessionId),
);
return {
params,
displayId,
};
}

function resolveNextSessionState(input: {
codec: AdapterSessionCodec;
adapterResult: AdapterExecutionResult;
Expand Down Expand Up @@ -848,6 +865,107 @@ export function heartbeatService(db: Db) {
.then((rows) => rows[0] ?? null);
}

async function resolveRuntimeStateSessionSnapshotForRun(input: {
agent: typeof agents.$inferSelect;
runtime: typeof agentRuntimeState.$inferSelect;
codec: AdapterSessionCodec;
}) {
const { agent, runtime, codec } = input;
const snapshot = readRuntimeStateSessionSnapshot(runtime.stateJson, codec);
const runtimeSessionId =
snapshot.displayId ??
readNonEmptyString(snapshot.params?.sessionId) ??
readNonEmptyString(runtime.sessionId);
if (!runtimeSessionId) return snapshot;

const fallbackAgentHomeCwd = resolveDefaultAgentWorkspaceDir(agent.id);
const snapshotCwd = readNonEmptyString(snapshot.params?.cwd);
if (snapshot.params && snapshotCwd && path.resolve(snapshotCwd) !== path.resolve(fallbackAgentHomeCwd)) {
return snapshot;
}

const latestTaskSession = await db
.select()
.from(agentTaskSessions)
.where(
and(
eq(agentTaskSessions.companyId, agent.companyId),
eq(agentTaskSessions.agentId, agent.id),
eq(agentTaskSessions.adapterType, agent.adapterType),
),
)
.orderBy(desc(agentTaskSessions.updatedAt), desc(agentTaskSessions.createdAt))
.limit(1)
.then((rows) => rows[0] ?? null);
const latestTaskSessionParams = normalizeSessionParams(
codec.deserialize(latestTaskSession?.sessionParamsJson ?? null),
);
const latestTaskSessionId = truncateDisplayId(
latestTaskSession?.sessionDisplayId ??
(codec.getDisplayId ? codec.getDisplayId(latestTaskSessionParams) : null) ??
readNonEmptyString(latestTaskSessionParams?.sessionId),
);
const latestTaskSessionCwd = readNonEmptyString(latestTaskSessionParams?.cwd);
if (
latestTaskSessionParams &&
latestTaskSessionId === runtimeSessionId &&
latestTaskSessionCwd &&
path.resolve(latestTaskSessionCwd) !== path.resolve(fallbackAgentHomeCwd)
) {
return {
params: latestTaskSessionParams,
displayId: latestTaskSessionId,
};
}

const latestRun = await getLatestRunForSession(agent.id, runtimeSessionId);
const latestContext = parseObject(latestRun?.contextSnapshot);
const workspace = parseObject(latestContext.paperclipWorkspace);
const cwd = readNonEmptyString(workspace.cwd);

if (
snapshot.params &&
(!snapshotCwd || path.resolve(snapshotCwd) !== path.resolve(fallbackAgentHomeCwd) || !cwd)
) {
return snapshot;
}

if (snapshot.params && cwd && path.resolve(cwd) !== path.resolve(snapshotCwd ?? fallbackAgentHomeCwd)) {
return {
params: {
...snapshot.params,
cwd,
...(readNonEmptyString(workspace.workspaceId) ? { workspaceId: readNonEmptyString(workspace.workspaceId) } : {}),
...(readNonEmptyString(workspace.repoUrl) ? { repoUrl: readNonEmptyString(workspace.repoUrl) } : {}),
...(readNonEmptyString(workspace.repoRef) ? { repoRef: readNonEmptyString(workspace.repoRef) } : {}),
},
displayId: snapshot.displayId ?? runtimeSessionId,
};
}

if (!cwd) {
return {
params: { sessionId: runtimeSessionId },
displayId: snapshot.displayId ?? runtimeSessionId,
};
}

const params: Record<string, unknown> = {
sessionId: runtimeSessionId,
cwd,
};
const workspaceId = readNonEmptyString(workspace.workspaceId);
const repoUrl = readNonEmptyString(workspace.repoUrl);
const repoRef = readNonEmptyString(workspace.repoRef);
if (workspaceId) params.workspaceId = workspaceId;
if (repoUrl) params.repoUrl = repoUrl;
if (repoRef) params.repoRef = repoRef;
return {
params,
displayId: snapshot.displayId ?? runtimeSessionId,
};
}

async function getOldestRunForSession(agentId: string, sessionId: string) {
return db
.select({
Expand Down Expand Up @@ -1020,7 +1138,14 @@ export function heartbeatService(db: Db) {
}

const runtimeForRun = await getRuntimeState(agent.id);
return runtimeForRun?.sessionId ?? null;
if (!runtimeForRun) return null;
const codec = getAdapterSessionCodec(agent.adapterType);
const runtimeSnapshot = await resolveRuntimeStateSessionSnapshotForRun({
agent,
runtime: runtimeForRun,
codec,
});
return runtimeSnapshot.displayId ?? runtimeForRun.sessionId ?? null;
}

async function resolveExplicitResumeSessionOverride(
Expand Down Expand Up @@ -1845,7 +1970,11 @@ export function heartbeatService(db: Db) {
agent: typeof agents.$inferSelect,
run: typeof heartbeatRuns.$inferSelect,
result: AdapterExecutionResult,
session: { legacySessionId: string | null },
session: {
legacySessionId: string | null;
params: Record<string, unknown> | null;
displayId: string | null;
},
normalizedUsage?: UsageTotals | null,
) {
await ensureRuntimeState(agent);
Expand All @@ -1865,6 +1994,10 @@ export function heartbeatService(db: Db) {
.set({
adapterType: agent.adapterType,
sessionId: session.legacySessionId,
stateJson: {
sessionParams: session.params,
sessionDisplayId: session.displayId,
},
lastRunId: run.id,
lastRunStatus: run.status,
lastError: result.errorMessage ?? null,
Expand Down Expand Up @@ -2028,7 +2161,14 @@ export function heartbeatService(db: Db) {
const previousSessionParams =
explicitResumeSessionParams ??
(explicitResumeSessionDisplayId ? { sessionId: explicitResumeSessionDisplayId } : null) ??
normalizeSessionParams(sessionCodec.deserialize(taskSessionForRun?.sessionParamsJson ?? null));
normalizeSessionParams(sessionCodec.deserialize(taskSessionForRun?.sessionParamsJson ?? null)) ??
(!resetTaskSession
? (await resolveRuntimeStateSessionSnapshotForRun({
agent,
runtime,
codec: sessionCodec,
})).params
: null);
const config = parseObject(agent.adapterConfig);
const executionWorkspaceMode = resolveExecutionWorkspaceMode({
projectPolicy: projectExecutionWorkspacePolicy,
Expand Down Expand Up @@ -2693,6 +2833,8 @@ export function heartbeatService(db: Db) {
if (finalizedRun) {
await updateRuntimeState(agent, finalizedRun, adapterResult, {
legacySessionId: nextSessionState.legacySessionId,
params: nextSessionState.params,
displayId: nextSessionState.displayId,
}, normalizedUsage);
if (taskKey) {
if (adapterResult.clearSession || (!nextSessionState.params && !nextSessionState.displayId)) {
Expand Down Expand Up @@ -2762,6 +2904,8 @@ export function heartbeatService(db: Db) {
errorMessage: message,
}, {
legacySessionId: runtimeForAdapter.sessionId,
params: runtimeForAdapter.sessionParams,
displayId: runtimeForAdapter.sessionDisplayId,
});

if (taskKey && (previousSessionParams || previousSessionDisplayId || taskSession)) {
Expand Down
Loading