diff --git a/apps/harness/lib/harness/session_manager.ex b/apps/harness/lib/harness/session_manager.ex index b3eca46f90..b5d51a5049 100644 --- a/apps/harness/lib/harness/session_manager.ex +++ b/apps/harness/lib/harness/session_manager.ex @@ -37,6 +37,8 @@ defmodule Harness.SessionManager do end defp start_session_with_module(session_module, thread_id, provider, params) do + params = maybe_inject_resume_cursor(params, thread_id, provider) + child_spec = {session_module, %{ @@ -258,6 +260,38 @@ defmodule Harness.SessionManager do SnapshotServer.apply_event(event) end + @doc false + # Inject resumeCursor from SQLite binding if one exists and the provider matches. + # If the caller already supplied a resumeCursor, respect it (don't overwrite). + def maybe_inject_resume_cursor(%{"resumeCursor" => _} = params, _thread_id, _provider) do + params + end + + def maybe_inject_resume_cursor(params, thread_id, provider) do + case Harness.Storage.get_binding(thread_id) do + %{provider: ^provider, resume_cursor_json: cursor_json} when is_binary(cursor_json) -> + case Jason.decode(cursor_json) do + {:ok, cursor} -> + normalized = normalize_resume_cursor(cursor) + Logger.info("Injecting resumeCursor from binding for #{thread_id} (#{provider})") + Map.put(params, "resumeCursor", normalized) + + {:error, reason} -> + Logger.warning("Failed to decode binding cursor for #{thread_id}: #{inspect(reason)}") + params + end + + _ -> + # No binding, provider mismatch, or nil cursor — start fresh + params + end + end + + # Codex stores {"threadId": "..."} but CodexSession expects the raw string. + # Other providers store richer objects their session modules know how to parse. + defp normalize_resume_cursor(%{"threadId" => tid}) when is_binary(tid), do: tid + defp normalize_resume_cursor(cursor), do: cursor + defp provider_module("codex"), do: {:ok, CodexSession} defp provider_module("claudeAgent"), do: {:ok, ClaudeSession} defp provider_module("opencode"), do: {:ok, OpenCodeSession} diff --git a/apps/harness/test/harness/storage_test.exs b/apps/harness/test/harness/storage_test.exs index 96b26b9149..5a924d0838 100644 --- a/apps/harness/test/harness/storage_test.exs +++ b/apps/harness/test/harness/storage_test.exs @@ -338,6 +338,85 @@ defmodule Harness.StorageTest do assert Storage.get_binding("t3").provider == "opencode" end + # --- Resume cursor injection tests (SessionManager.maybe_inject_resume_cursor) --- + + alias Harness.SessionManager + + test "injects resumeCursor when binding exists and provider matches" do + cursor = Jason.encode!(%{"threadId" => "codex-abc"}) + :ok = Storage.upsert_binding("t1", "codex", cursor) + + params = %{"threadId" => "t1", "provider" => "codex"} + result = SessionManager.maybe_inject_resume_cursor(params, "t1", "codex") + + # Codex cursors are normalized: {"threadId":"abc"} → "abc" (raw string) + assert result["resumeCursor"] == "codex-abc" + end + + test "does NOT inject resumeCursor when provider mismatches" do + cursor = Jason.encode!(%{"threadId" => "codex-abc"}) + :ok = Storage.upsert_binding("t1", "codex", cursor) + + params = %{"threadId" => "t1", "provider" => "cursor"} + result = SessionManager.maybe_inject_resume_cursor(params, "t1", "cursor") + + refute Map.has_key?(result, "resumeCursor") + end + + test "does NOT inject resumeCursor when no binding exists" do + params = %{"threadId" => "t1", "provider" => "codex"} + result = SessionManager.maybe_inject_resume_cursor(params, "t1", "codex") + + refute Map.has_key?(result, "resumeCursor") + end + + test "does NOT overwrite caller-supplied resumeCursor" do + cursor = Jason.encode!(%{"threadId" => "codex-abc"}) + :ok = Storage.upsert_binding("t1", "codex", cursor) + + params = %{"threadId" => "t1", "provider" => "codex", "resumeCursor" => %{"threadId" => "explicit"}} + result = SessionManager.maybe_inject_resume_cursor(params, "t1", "codex") + + assert result["resumeCursor"] == %{"threadId" => "explicit"} + end + + test "does NOT inject resumeCursor when cursor_json is nil" do + :ok = Storage.upsert_binding("t1", "codex", nil) + + params = %{"threadId" => "t1", "provider" => "codex"} + result = SessionManager.maybe_inject_resume_cursor(params, "t1", "codex") + + refute Map.has_key?(result, "resumeCursor") + end + + test "multi-session: concurrent bindings for different threads don't cross-contaminate" do + :ok = Storage.upsert_binding("t1", "codex", Jason.encode!(%{"threadId" => "codex-1"})) + :ok = Storage.upsert_binding("t2", "cursor", Jason.encode!(%{"cursorChatId" => "cursor-1"})) + :ok = Storage.upsert_binding("t3", "opencode", Jason.encode!(%{"sessionId" => "oc-1"})) + + r1 = SessionManager.maybe_inject_resume_cursor(%{}, "t1", "codex") + r2 = SessionManager.maybe_inject_resume_cursor(%{}, "t2", "cursor") + r3 = SessionManager.maybe_inject_resume_cursor(%{}, "t3", "opencode") + + # Codex cursor normalized to raw string; others kept as maps + assert r1["resumeCursor"] == "codex-1" + assert r2["resumeCursor"] == %{"cursorChatId" => "cursor-1"} + assert r3["resumeCursor"] == %{"sessionId" => "oc-1"} + end + + test "multi-session: same thread_id with different provider ignores stale binding" do + # Codex wrote a binding for t1 + :ok = Storage.upsert_binding("t1", "codex", Jason.encode!(%{"threadId" => "codex-abc"})) + + # Cursor tries to start on same thread — should NOT get Codex's cursor + result = SessionManager.maybe_inject_resume_cursor(%{}, "t1", "cursor") + refute Map.has_key?(result, "resumeCursor") + + # Codex should still get its own cursor (normalized to raw string) + result = SessionManager.maybe_inject_resume_cursor(%{}, "t1", "codex") + assert result["resumeCursor"] == "codex-abc" + end + # --- Integration: SnapshotServer recovery --- describe "SnapshotServer recovery" do diff --git a/apps/server/src/provider/Layers/HarnessClientAdapter.ts b/apps/server/src/provider/Layers/HarnessClientAdapter.ts index 71442c85a2..c9fa492957 100644 --- a/apps/server/src/provider/Layers/HarnessClientAdapter.ts +++ b/apps/server/src/provider/Layers/HarnessClientAdapter.ts @@ -917,10 +917,9 @@ function mapHarnessEventToRuntimeEvents( return providerMapped; } - // Silently drop unrecognised events. Provider-specific notifications that - // pass through the GenServer (e.g. Codex app-server lifecycle events) are - // not bugs — they're simply events with no canonical mapping. The raw event - // stream still contains them for debugging. + // Unmapped events are already logged by codexMapToRuntimeEvents (which + // handles both QUIET_UNMAPPED_EVENTS suppression and console.debug for + // genuinely unknown methods). No additional logging needed here. return []; } diff --git a/apps/server/src/provider/Layers/codexEventMapping.test.ts b/apps/server/src/provider/Layers/codexEventMapping.test.ts index 1334b1e2cc..4041e838cf 100644 --- a/apps/server/src/provider/Layers/codexEventMapping.test.ts +++ b/apps/server/src/provider/Layers/codexEventMapping.test.ts @@ -2,15 +2,9 @@ * Unit tests for codexEventMapping — calls mapToRuntimeEvents directly * without the full adapter/stream infrastructure. */ -import { describe, it, expect } from "bun:test"; +import { describe, it, expect } from "vitest"; import { mapToRuntimeEvents } from "./codexEventMapping.ts"; -import { - EventId, - type ProviderEvent, - ProviderItemId, - ThreadId, - TurnId, -} from "@t3tools/contracts"; +import { EventId, type ProviderEvent, ProviderItemId, ThreadId, TurnId } from "@t3tools/contracts"; const threadId = ThreadId.makeUnsafe("thread-1"); diff --git a/apps/server/src/provider/Layers/codexEventMapping.ts b/apps/server/src/provider/Layers/codexEventMapping.ts index fd2d5e1ad4..d3a1fdd916 100644 --- a/apps/server/src/provider/Layers/codexEventMapping.ts +++ b/apps/server/src/provider/Layers/codexEventMapping.ts @@ -514,6 +514,8 @@ const QUIET_UNMAPPED_EVENTS = new Set([ "hook_response", "user", "thinking", + // Raw equivalents of item/agentMessage/delta (already mapped). + // Mapping these would cause duplicate deltas in the UI. "codex/event/agent_message_delta", "codex/event/agent_message", "codex/event/agent_message_content_delta",