From fc8183ca005b66aef05571cfb42a8666372ebd7a Mon Sep 17 00:00:00 2001 From: Bastian Venegas Date: Wed, 25 Mar 2026 21:27:14 -0300 Subject: [PATCH 1/6] =?UTF-8?q?feat(harness):=20complete=20resume=20loop?= =?UTF-8?q?=20=E2=80=94=20SessionManager=20reads=20bindings?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SessionManager now reads Storage.get_binding on session start and injects resumeCursor into params when the provider matches. Completes the bindings table feature: write path (PR #12) + read path (this). Provider mismatch check prevents injecting a Codex cursor into a Cursor session. Caller-supplied resumeCursor is never overwritten. 7 new tests including multi-session concurrent and cross-provider. Co-Authored-By: Claude Opus 4.6 (1M context) --- apps/harness/lib/harness/session_manager.ex | 27 ++++++++ apps/harness/test/harness/storage_test.exs | 77 +++++++++++++++++++++ 2 files changed, 104 insertions(+) diff --git a/apps/harness/lib/harness/session_manager.ex b/apps/harness/lib/harness/session_manager.ex index b3eca46f90..5731bfc2ef 100644 --- a/apps/harness/lib/harness/session_manager.ex +++ b/apps/harness/lib/harness/session_manager.ex @@ -37,6 +37,8 @@ defmodule Harness.SessionManager do end defp start_session_with_module(session_module, thread_id, provider, params) do + params = maybe_inject_resume_cursor(params, thread_id, provider) + child_spec = {session_module, %{ @@ -258,6 +260,31 @@ defmodule Harness.SessionManager do SnapshotServer.apply_event(event) end + @doc false + # Inject resumeCursor from SQLite binding if one exists and the provider matches. + # If the caller already supplied a resumeCursor, respect it (don't overwrite). + def maybe_inject_resume_cursor(%{"resumeCursor" => _} = params, _thread_id, _provider) do + params + end + + def maybe_inject_resume_cursor(params, thread_id, provider) do + case Harness.Storage.get_binding(thread_id) do + %{provider: ^provider, resume_cursor_json: cursor_json} when is_binary(cursor_json) -> + case Jason.decode(cursor_json) do + {:ok, cursor} -> + Logger.info("Injecting resumeCursor from binding for #{thread_id} (#{provider})") + Map.put(params, "resumeCursor", cursor) + + _ -> + params + end + + _ -> + # No binding, provider mismatch, or nil cursor — start fresh + params + end + end + defp provider_module("codex"), do: {:ok, CodexSession} defp provider_module("claudeAgent"), do: {:ok, ClaudeSession} defp provider_module("opencode"), do: {:ok, OpenCodeSession} diff --git a/apps/harness/test/harness/storage_test.exs b/apps/harness/test/harness/storage_test.exs index 96b26b9149..1a5dbaaee1 100644 --- a/apps/harness/test/harness/storage_test.exs +++ b/apps/harness/test/harness/storage_test.exs @@ -338,6 +338,83 @@ defmodule Harness.StorageTest do assert Storage.get_binding("t3").provider == "opencode" end + # --- Resume cursor injection tests (SessionManager.maybe_inject_resume_cursor) --- + + alias Harness.SessionManager + + test "injects resumeCursor when binding exists and provider matches" do + cursor = Jason.encode!(%{"threadId" => "codex-abc"}) + :ok = Storage.upsert_binding("t1", "codex", cursor) + + params = %{"threadId" => "t1", "provider" => "codex"} + result = SessionManager.maybe_inject_resume_cursor(params, "t1", "codex") + + assert result["resumeCursor"] == %{"threadId" => "codex-abc"} + end + + test "does NOT inject resumeCursor when provider mismatches" do + cursor = Jason.encode!(%{"threadId" => "codex-abc"}) + :ok = Storage.upsert_binding("t1", "codex", cursor) + + params = %{"threadId" => "t1", "provider" => "cursor"} + result = SessionManager.maybe_inject_resume_cursor(params, "t1", "cursor") + + refute Map.has_key?(result, "resumeCursor") + end + + test "does NOT inject resumeCursor when no binding exists" do + params = %{"threadId" => "t1", "provider" => "codex"} + result = SessionManager.maybe_inject_resume_cursor(params, "t1", "codex") + + refute Map.has_key?(result, "resumeCursor") + end + + test "does NOT overwrite caller-supplied resumeCursor" do + cursor = Jason.encode!(%{"threadId" => "codex-abc"}) + :ok = Storage.upsert_binding("t1", "codex", cursor) + + params = %{"threadId" => "t1", "provider" => "codex", "resumeCursor" => %{"threadId" => "explicit"}} + result = SessionManager.maybe_inject_resume_cursor(params, "t1", "codex") + + assert result["resumeCursor"] == %{"threadId" => "explicit"} + end + + test "does NOT inject resumeCursor when cursor_json is nil" do + :ok = Storage.upsert_binding("t1", "codex", nil) + + params = %{"threadId" => "t1", "provider" => "codex"} + result = SessionManager.maybe_inject_resume_cursor(params, "t1", "codex") + + refute Map.has_key?(result, "resumeCursor") + end + + test "multi-session: concurrent bindings for different threads don't cross-contaminate" do + :ok = Storage.upsert_binding("t1", "codex", Jason.encode!(%{"threadId" => "codex-1"})) + :ok = Storage.upsert_binding("t2", "cursor", Jason.encode!(%{"cursorChatId" => "cursor-1"})) + :ok = Storage.upsert_binding("t3", "opencode", Jason.encode!(%{"sessionId" => "oc-1"})) + + r1 = SessionManager.maybe_inject_resume_cursor(%{}, "t1", "codex") + r2 = SessionManager.maybe_inject_resume_cursor(%{}, "t2", "cursor") + r3 = SessionManager.maybe_inject_resume_cursor(%{}, "t3", "opencode") + + assert r1["resumeCursor"] == %{"threadId" => "codex-1"} + assert r2["resumeCursor"] == %{"cursorChatId" => "cursor-1"} + assert r3["resumeCursor"] == %{"sessionId" => "oc-1"} + end + + test "multi-session: same thread_id with different provider ignores stale binding" do + # Codex wrote a binding for t1 + :ok = Storage.upsert_binding("t1", "codex", Jason.encode!(%{"threadId" => "codex-abc"})) + + # Cursor tries to start on same thread — should NOT get Codex's cursor + result = SessionManager.maybe_inject_resume_cursor(%{}, "t1", "cursor") + refute Map.has_key?(result, "resumeCursor") + + # Codex should still get its own cursor + result = SessionManager.maybe_inject_resume_cursor(%{}, "t1", "codex") + assert result["resumeCursor"] == %{"threadId" => "codex-abc"} + end + # --- Integration: SnapshotServer recovery --- describe "SnapshotServer recovery" do From 836715ce80d141d1c4b0e6563bc0612471bbf968 Mon Sep 17 00:00:00 2001 From: Bastian Venegas Date: Wed, 25 Mar 2026 21:29:08 -0300 Subject: [PATCH 2/6] docs(codex): document why agent_message_delta events are silenced These raw codex/event/agent_message_* events are duplicates of the item-level item/agentMessage/delta (already mapped to content.delta). Mapping both would cause double deltas in the UI. Confirmed via stress-test-real-subagent.ts which treats them as equivalent, and by tracing both paths through the event pipeline. Co-Authored-By: Claude Opus 4.6 (1M context) --- apps/server/src/provider/Layers/codexEventMapping.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/server/src/provider/Layers/codexEventMapping.ts b/apps/server/src/provider/Layers/codexEventMapping.ts index fd2d5e1ad4..d3a1fdd916 100644 --- a/apps/server/src/provider/Layers/codexEventMapping.ts +++ b/apps/server/src/provider/Layers/codexEventMapping.ts @@ -514,6 +514,8 @@ const QUIET_UNMAPPED_EVENTS = new Set([ "hook_response", "user", "thinking", + // Raw equivalents of item/agentMessage/delta (already mapped). + // Mapping these would cause duplicate deltas in the UI. "codex/event/agent_message_delta", "codex/event/agent_message", "codex/event/agent_message_content_delta", From 112adc70b3bce65b469f611118d4e609edf2c45a Mon Sep 17 00:00:00 2001 From: Bastian Venegas Date: Wed, 25 Mar 2026 21:30:59 -0300 Subject: [PATCH 3/6] fix(harness): log unmapped events + document agent_message silencing Two observability improvements: 1. mapHarnessEventToRuntimeEvents now logs unmapped events with provider context instead of silently returning []. Surfaces mapping gaps for new event types from any provider. 2. Document why codex/event/agent_message_* events are in QUIET_UNMAPPED_EVENTS: they're redundant with item/agentMessage/delta (already mapped). Mapping both would cause duplicate deltas. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/provider/Layers/HarnessClientAdapter.ts | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/apps/server/src/provider/Layers/HarnessClientAdapter.ts b/apps/server/src/provider/Layers/HarnessClientAdapter.ts index 71442c85a2..a04c2b9cca 100644 --- a/apps/server/src/provider/Layers/HarnessClientAdapter.ts +++ b/apps/server/src/provider/Layers/HarnessClientAdapter.ts @@ -917,10 +917,15 @@ function mapHarnessEventToRuntimeEvents( return providerMapped; } - // Silently drop unrecognised events. Provider-specific notifications that - // pass through the GenServer (e.g. Codex app-server lifecycle events) are - // not bugs — they're simply events with no canonical mapping. The raw event - // stream still contains them for debugging. + // Emit a warning for truly unrecognised events so operators can surface + // mapping gaps. Known-quiet events (already handled by codexEventMapping's + // QUIET_UNMAPPED_EVENTS) won't reach here — they return [] from the + // codexMapToRuntimeEvents fallback above. + if (event.method) { + console.debug( + `[mapHarnessEventToRuntimeEvents] unmapped event: ${event.method} (provider: ${event.provider ?? "unknown"})`, + ); + } return []; } From 3c20bc198859a80baaf90b1244d21e9ac3abb1c8 Mon Sep 17 00:00:00 2001 From: Bastian Venegas Date: Wed, 25 Mar 2026 22:05:03 -0300 Subject: [PATCH 4/6] =?UTF-8?q?fix:=20address=20CodeRabbit=20review=20?= =?UTF-8?q?=E2=80=94=20cursor=20normalization=20+=20log=20dedup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three fixes from review: 1. Normalize Codex resume cursor: {"threadId":"abc"} → "abc" (raw string). CodexSession sends resumeCursor directly as the threadId RPC param — a nested map would be rejected by the app-server. 2. Remove redundant fallback log in mapHarnessEventToRuntimeEvents. codexMapToRuntimeEvents already handles both QUIET_UNMAPPED_EVENTS suppression and console.debug for unknown methods. 3. Log warning on corrupt/un-decodable binding JSON instead of silently returning params. Co-Authored-By: Claude Opus 4.6 (1M context) --- apps/harness/lib/harness/session_manager.ex | 11 +++++++++-- apps/harness/test/harness/storage_test.exs | 10 ++++++---- .../src/provider/Layers/HarnessClientAdapter.ts | 12 +++--------- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/apps/harness/lib/harness/session_manager.ex b/apps/harness/lib/harness/session_manager.ex index 5731bfc2ef..b5d51a5049 100644 --- a/apps/harness/lib/harness/session_manager.ex +++ b/apps/harness/lib/harness/session_manager.ex @@ -272,10 +272,12 @@ defmodule Harness.SessionManager do %{provider: ^provider, resume_cursor_json: cursor_json} when is_binary(cursor_json) -> case Jason.decode(cursor_json) do {:ok, cursor} -> + normalized = normalize_resume_cursor(cursor) Logger.info("Injecting resumeCursor from binding for #{thread_id} (#{provider})") - Map.put(params, "resumeCursor", cursor) + Map.put(params, "resumeCursor", normalized) - _ -> + {:error, reason} -> + Logger.warning("Failed to decode binding cursor for #{thread_id}: #{inspect(reason)}") params end @@ -285,6 +287,11 @@ defmodule Harness.SessionManager do end end + # Codex stores {"threadId": "..."} but CodexSession expects the raw string. + # Other providers store richer objects their session modules know how to parse. + defp normalize_resume_cursor(%{"threadId" => tid}) when is_binary(tid), do: tid + defp normalize_resume_cursor(cursor), do: cursor + defp provider_module("codex"), do: {:ok, CodexSession} defp provider_module("claudeAgent"), do: {:ok, ClaudeSession} defp provider_module("opencode"), do: {:ok, OpenCodeSession} diff --git a/apps/harness/test/harness/storage_test.exs b/apps/harness/test/harness/storage_test.exs index 1a5dbaaee1..5a924d0838 100644 --- a/apps/harness/test/harness/storage_test.exs +++ b/apps/harness/test/harness/storage_test.exs @@ -349,7 +349,8 @@ defmodule Harness.StorageTest do params = %{"threadId" => "t1", "provider" => "codex"} result = SessionManager.maybe_inject_resume_cursor(params, "t1", "codex") - assert result["resumeCursor"] == %{"threadId" => "codex-abc"} + # Codex cursors are normalized: {"threadId":"abc"} → "abc" (raw string) + assert result["resumeCursor"] == "codex-abc" end test "does NOT inject resumeCursor when provider mismatches" do @@ -397,7 +398,8 @@ defmodule Harness.StorageTest do r2 = SessionManager.maybe_inject_resume_cursor(%{}, "t2", "cursor") r3 = SessionManager.maybe_inject_resume_cursor(%{}, "t3", "opencode") - assert r1["resumeCursor"] == %{"threadId" => "codex-1"} + # Codex cursor normalized to raw string; others kept as maps + assert r1["resumeCursor"] == "codex-1" assert r2["resumeCursor"] == %{"cursorChatId" => "cursor-1"} assert r3["resumeCursor"] == %{"sessionId" => "oc-1"} end @@ -410,9 +412,9 @@ defmodule Harness.StorageTest do result = SessionManager.maybe_inject_resume_cursor(%{}, "t1", "cursor") refute Map.has_key?(result, "resumeCursor") - # Codex should still get its own cursor + # Codex should still get its own cursor (normalized to raw string) result = SessionManager.maybe_inject_resume_cursor(%{}, "t1", "codex") - assert result["resumeCursor"] == %{"threadId" => "codex-abc"} + assert result["resumeCursor"] == "codex-abc" end # --- Integration: SnapshotServer recovery --- diff --git a/apps/server/src/provider/Layers/HarnessClientAdapter.ts b/apps/server/src/provider/Layers/HarnessClientAdapter.ts index a04c2b9cca..c9fa492957 100644 --- a/apps/server/src/provider/Layers/HarnessClientAdapter.ts +++ b/apps/server/src/provider/Layers/HarnessClientAdapter.ts @@ -917,15 +917,9 @@ function mapHarnessEventToRuntimeEvents( return providerMapped; } - // Emit a warning for truly unrecognised events so operators can surface - // mapping gaps. Known-quiet events (already handled by codexEventMapping's - // QUIET_UNMAPPED_EVENTS) won't reach here — they return [] from the - // codexMapToRuntimeEvents fallback above. - if (event.method) { - console.debug( - `[mapHarnessEventToRuntimeEvents] unmapped event: ${event.method} (provider: ${event.provider ?? "unknown"})`, - ); - } + // Unmapped events are already logged by codexMapToRuntimeEvents (which + // handles both QUIET_UNMAPPED_EVENTS suppression and console.debug for + // genuinely unknown methods). No additional logging needed here. return []; } From 054215f90df08ae32db6ec0ded7d0684140ddbea Mon Sep 17 00:00:00 2001 From: Bastian Venegas Date: Wed, 25 Mar 2026 22:09:59 -0300 Subject: [PATCH 5/6] style: format codexEventMapping.test.ts to pass oxfmt Co-Authored-By: Claude Opus 4.6 (1M context) --- apps/server/src/provider/Layers/codexEventMapping.test.ts | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/apps/server/src/provider/Layers/codexEventMapping.test.ts b/apps/server/src/provider/Layers/codexEventMapping.test.ts index 1334b1e2cc..368cab8709 100644 --- a/apps/server/src/provider/Layers/codexEventMapping.test.ts +++ b/apps/server/src/provider/Layers/codexEventMapping.test.ts @@ -4,13 +4,7 @@ */ import { describe, it, expect } from "bun:test"; import { mapToRuntimeEvents } from "./codexEventMapping.ts"; -import { - EventId, - type ProviderEvent, - ProviderItemId, - ThreadId, - TurnId, -} from "@t3tools/contracts"; +import { EventId, type ProviderEvent, ProviderItemId, ThreadId, TurnId } from "@t3tools/contracts"; const threadId = ThreadId.makeUnsafe("thread-1"); From 138cf2044ad8fd5340ec598db2a3149de167e4e3 Mon Sep 17 00:00:00 2001 From: Bastian Venegas Date: Wed, 25 Mar 2026 22:15:51 -0300 Subject: [PATCH 6/6] fix(ci): use vitest instead of bun:test in codexEventMapping tests CI runs vitest, not bun. bun:test is not available in the GitHub Actions runner. Co-Authored-By: Claude Opus 4.6 (1M context) --- apps/server/src/provider/Layers/codexEventMapping.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/server/src/provider/Layers/codexEventMapping.test.ts b/apps/server/src/provider/Layers/codexEventMapping.test.ts index 368cab8709..4041e838cf 100644 --- a/apps/server/src/provider/Layers/codexEventMapping.test.ts +++ b/apps/server/src/provider/Layers/codexEventMapping.test.ts @@ -2,7 +2,7 @@ * Unit tests for codexEventMapping — calls mapToRuntimeEvents directly * without the full adapter/stream infrastructure. */ -import { describe, it, expect } from "bun:test"; +import { describe, it, expect } from "vitest"; import { mapToRuntimeEvents } from "./codexEventMapping.ts"; import { EventId, type ProviderEvent, ProviderItemId, ThreadId, TurnId } from "@t3tools/contracts";