diff --git a/README.md b/README.md index cac7c737ff..87106b15d7 100644 --- a/README.md +++ b/README.md @@ -29,17 +29,17 @@ Once you manage several concurrent agent sessions — each with its own protocol The engine owns supervision, crash isolation, and per-session memory containment. Node keeps SDK access, TypeScript contracts, desktop integration, and the orchestration layer. Both runtimes share SQLite for durability. -| Concern | Owner | -| ---------------------------------- | ----------- | -| Provider process lifecycle | Elixir | -| Crash isolation + supervision | Elixir | -| Per-session memory containment | Elixir | +| Concern | Owner | +| ---------------------------------- | ------------- | +| Provider process lifecycle | Elixir | +| Crash isolation + supervision | Elixir | +| Per-session memory containment | Elixir | | Session + event durability | Both (SQLite) | -| Pending request crash recovery | Elixir | -| Canonical event mapping (TS types) | Node | -| Claude Agent SDK | Node | -| Browser/Electron WebSocket | Node | -| Desktop + product integration | Node | +| Pending request crash recovery | Elixir | +| Canonical event mapping (TS types) | Node | +| Claude Agent SDK | Node | +| Browser/Electron WebSocket | Node | +| Desktop + product integration | Node | --- @@ -140,19 +140,19 @@ python3 output/stress-test/viz-real.py ### Elixir harness (`apps/harness/`) -| Module | Role | -| ------------------ | ------------------------------------------------- | -| `SessionManager` | DynamicSupervisor routing, session lifecycle | -| `CodexSession` | Codex JSON-RPC GenServer | -| `CursorSession` | Cursor stream-json + tool mapping | -| `OpenCodeSession` | OpenCode HTTP+SSE + tool mapping | -| `ClaudeSession` | Claude CLI GenServer (stress tests only) | -| `MockSession` | Configurable mock for stress testing | -| `SnapshotServer` | In-memory event store + WAL replay + recovery | -| `Storage` | SQLite durability (sessions, events, pending reqs) | -| `Projector` | Pure event → snapshot projection | -| `ModelDiscovery` | CLI-based model listing with ETS cache | -| `HarnessChannel` | Phoenix Channel — single WS entry point | +| Module | Role | +| ----------------- | -------------------------------------------------- | +| `SessionManager` | DynamicSupervisor routing, session lifecycle | +| `CodexSession` | Codex JSON-RPC GenServer | +| `CursorSession` | Cursor stream-json + tool mapping | +| `OpenCodeSession` | OpenCode HTTP+SSE + tool mapping | +| `ClaudeSession` | Claude CLI GenServer (stress tests only) | +| `MockSession` | Configurable mock for stress testing | +| `SnapshotServer` | In-memory event store + WAL replay + recovery | +| `Storage` | SQLite durability (sessions, events, pending reqs) | +| `Projector` | Pure event → snapshot projection | +| `ModelDiscovery` | CLI-based model listing with ETS cache | +| `HarnessChannel` | Phoenix Channel — single WS entry point | ### Node integration @@ -160,8 +160,8 @@ python3 output/stress-test/viz-real.py | ------------------------- | ----------------------------------------------------------- | | `HarnessClientAdapter.ts` | Single adapter: all 13 `ProviderAdapterShape` methods | | `HarnessClientManager.ts` | Phoenix Channel WS client with reconnection | -| `codexEventMapping.ts` | Canonical event mapping (shared with existing CodexAdapter) | -| `ClaudeAdapter.ts` | Claude Agent SDK adapter (Node-native, not via harness) | +| `codexEventMapping.ts` | Canonical event mapping (shared with existing CodexAdapter) | +| `ClaudeAdapter.ts` | Claude Agent SDK adapter (Node-native, not via harness) | --- diff --git a/apps/harness/lib/harness/providers/codex_session.ex b/apps/harness/lib/harness/providers/codex_session.ex index ce59551220..96a7573f48 100644 --- a/apps/harness/lib/harness/providers/codex_session.ex +++ b/apps/harness/lib/harness/providers/codex_session.ex @@ -739,6 +739,7 @@ defmodule Harness.Providers.CodexSession do {%{from: nil, method: "thread/start"}, pending} -> codex_id = get_in(result, ["thread", "id"]) state = %{state | pending: pending, codex_thread_id: codex_id, ready: true} + persist_binding(state) Enum.each(state.ready_waiters, &GenServer.reply(&1, :ok)) %{state | ready_waiters: []} @@ -746,6 +747,7 @@ defmodule Harness.Providers.CodexSession do # Resume succeeded — same handling as thread/start codex_id = get_in(result, ["thread", "id"]) state = %{state | pending: pending, codex_thread_id: codex_id, ready: true} + persist_binding(state) Enum.each(state.ready_waiters, &GenServer.reply(&1, :ok)) %{state | ready_waiters: []} @@ -766,6 +768,13 @@ defmodule Harness.Providers.CodexSession do message = Map.get(error, "message", "Unknown error") state = %{state | pending: pending} + # Delete stale binding before any retry — prevents livelock if + # the cursor is permanently invalid. This runs for both recoverable + # and non-recoverable failures intentionally: if resume failed, the + # cursor is suspect. The recoverable path falls back to thread/start + # which upserts a fresh binding on success. + Harness.Storage.delete_binding(state.thread_id) + if is_recoverable_resume_error?(message) do Logger.info( "Thread resume failed (recoverable): #{message}, falling back to thread/start" @@ -998,6 +1007,15 @@ defmodule Harness.Providers.CodexSession do Port.command(port, message <> "\n") end + defp persist_binding(state) do + cursor_json = + if state.codex_thread_id do + Jason.encode!(%{"threadId" => state.codex_thread_id}) + end + + Harness.Storage.upsert_binding(state.thread_id, state.provider, cursor_json) + end + defp emit_event(state, kind, method, payload) do event = Event.new(%{ diff --git a/apps/harness/lib/harness/storage.ex b/apps/harness/lib/harness/storage.ex index b0649b554f..77b1cacb38 100644 --- a/apps/harness/lib/harness/storage.ex +++ b/apps/harness/lib/harness/storage.ex @@ -64,6 +64,21 @@ defmodule Harness.Storage do GenServer.call(__MODULE__, {:get_pending_requests, thread_id}) end + @doc "Upsert a binding (resume cursor) for a thread. Opaque pass-through — harness never parses resume_cursor_json." + def upsert_binding(thread_id, provider, resume_cursor_json) do + GenServer.call(__MODULE__, {:upsert_binding, thread_id, provider, resume_cursor_json}) + end + + @doc "Get the binding for a thread. Returns %{thread_id, provider, resume_cursor_json} or nil." + def get_binding(thread_id) do + GenServer.call(__MODULE__, {:get_binding, thread_id}) + end + + @doc "Delete a binding. Called on any resume failure (recoverable or not) before retry." + def delete_binding(thread_id) do + GenServer.call(__MODULE__, {:delete_binding, thread_id}) + end + @doc "Truncate all tables. Test-only." def reset! do GenServer.call(__MODULE__, :reset) @@ -131,7 +146,23 @@ defmodule Harness.Storage do {:reply, result, state} end + def handle_call({:upsert_binding, thread_id, provider, resume_cursor_json}, _from, %{conn: conn} = state) do + result = do_upsert_binding(conn, thread_id, provider, resume_cursor_json) + {:reply, result, state} + end + + def handle_call({:get_binding, thread_id}, _from, %{conn: conn} = state) do + result = do_get_binding(conn, thread_id) + {:reply, result, state} + end + + def handle_call({:delete_binding, thread_id}, _from, %{conn: conn} = state) do + result = do_delete_binding(conn, thread_id) + {:reply, result, state} + end + def handle_call(:reset, _from, %{conn: conn} = state) do + Sqlite3.execute(conn, "DELETE FROM harness_bindings") Sqlite3.execute(conn, "DELETE FROM harness_pending_requests") Sqlite3.execute(conn, "DELETE FROM harness_events") Sqlite3.execute(conn, "DELETE FROM harness_sessions") @@ -230,6 +261,21 @@ defmodule Harness.Storage do CREATE INDEX IF NOT EXISTS idx_pending_reqs_thread ON harness_pending_requests(thread_id) """) + + # Bindings: provider-specific resume cursors that survive session close. + # Rows are UPSERTed on thread/start or thread/resume success. + # Deleted on any resume failure (recoverable or not) before retry. + # Expected provider values: "codex", "cursor", "opencode" + # (Claude uses Node SDK directly, not the harness.) + Sqlite3.execute(conn, """ + CREATE TABLE IF NOT EXISTS harness_bindings ( + thread_id TEXT PRIMARY KEY, + provider TEXT NOT NULL, + resume_cursor_json TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """) end # --- Event operations --- @@ -400,6 +446,53 @@ defmodule Harness.Storage do |> Enum.map(&row_to_pending_request/1) end + # --- Binding operations --- + + defp do_upsert_binding(conn, thread_id, provider, resume_cursor_json) do + now = DateTime.utc_now() |> DateTime.to_iso8601() + + sql = """ + INSERT INTO harness_bindings (thread_id, provider, resume_cursor_json, created_at, updated_at) + VALUES (?1, ?2, ?3, ?4, ?5) + ON CONFLICT(thread_id) DO UPDATE SET + provider = excluded.provider, + resume_cursor_json = excluded.resume_cursor_json, + updated_at = excluded.updated_at + """ + + {:ok, stmt} = Sqlite3.prepare(conn, sql) + + try do + :ok = Sqlite3.bind(stmt, [thread_id, provider, resume_cursor_json, now, now]) + :done = Sqlite3.step(conn, stmt) + :ok + after + Sqlite3.release(conn, stmt) + end + end + + defp do_get_binding(conn, thread_id) do + sql = "SELECT thread_id, provider, resume_cursor_json FROM harness_bindings WHERE thread_id = ?1" + + case query_one(conn, sql, [thread_id]) do + [tid, provider, cursor_json] -> %{thread_id: tid, provider: provider, resume_cursor_json: cursor_json} + nil -> nil + end + end + + defp do_delete_binding(conn, thread_id) do + sql = "DELETE FROM harness_bindings WHERE thread_id = ?1" + {:ok, stmt} = Sqlite3.prepare(conn, sql) + + try do + :ok = Sqlite3.bind(stmt, [thread_id]) + :done = Sqlite3.step(conn, stmt) + :ok + after + Sqlite3.release(conn, stmt) + end + end + # --- Session queries --- defp do_get_all_sessions(conn) do diff --git a/apps/harness/test/harness/storage_test.exs b/apps/harness/test/harness/storage_test.exs index e19c7bce29..96b26b9149 100644 --- a/apps/harness/test/harness/storage_test.exs +++ b/apps/harness/test/harness/storage_test.exs @@ -275,6 +275,69 @@ defmodule Harness.StorageTest do assert length(Storage.get_pending_requests()) == 0 end + # --- Binding tests --- + + test "upsert_binding creates and retrieves a binding" do + cursor = Jason.encode!(%{"threadId" => "codex-thread-abc"}) + :ok = Storage.upsert_binding("t1", "codex", cursor) + + binding = Storage.get_binding("t1") + assert binding.thread_id == "t1" + assert binding.provider == "codex" + assert binding.resume_cursor_json == cursor + end + + test "upsert_binding updates existing binding" do + cursor1 = Jason.encode!(%{"threadId" => "old-thread"}) + cursor2 = Jason.encode!(%{"threadId" => "new-thread"}) + + :ok = Storage.upsert_binding("t1", "codex", cursor1) + :ok = Storage.upsert_binding("t1", "codex", cursor2) + + binding = Storage.get_binding("t1") + assert binding.resume_cursor_json == cursor2 + end + + test "get_binding returns nil for unknown thread" do + assert Storage.get_binding("nonexistent") == nil + end + + test "delete_binding removes the row" do + :ok = Storage.upsert_binding("t1", "codex", ~s({"threadId":"abc"})) + assert Storage.get_binding("t1") != nil + + :ok = Storage.delete_binding("t1") + assert Storage.get_binding("t1") == nil + end + + test "delete_binding is idempotent" do + :ok = Storage.delete_binding("nonexistent") + assert Storage.get_binding("nonexistent") == nil + end + + test "upsert_binding with nil cursor" do + :ok = Storage.upsert_binding("t1", "codex", nil) + binding = Storage.get_binding("t1") + assert binding.thread_id == "t1" + assert binding.resume_cursor_json == nil + end + + test "reset! clears bindings" do + :ok = Storage.upsert_binding("t1", "codex", ~s({"threadId":"abc"})) + Storage.reset!() + assert Storage.get_binding("t1") == nil + end + + test "bindings for different providers coexist" do + :ok = Storage.upsert_binding("t1", "codex", ~s({"threadId":"codex-1"})) + :ok = Storage.upsert_binding("t2", "cursor", ~s({"cursorChatId":"cursor-1"})) + :ok = Storage.upsert_binding("t3", "opencode", ~s({"sessionId":"oc-1"})) + + assert Storage.get_binding("t1").provider == "codex" + assert Storage.get_binding("t2").provider == "cursor" + assert Storage.get_binding("t3").provider == "opencode" + end + # --- Integration: SnapshotServer recovery --- describe "SnapshotServer recovery" do