Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 25 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ Once you manage several concurrent agent sessions — each with its own protocol

The engine owns supervision, crash isolation, and per-session memory containment. Node keeps SDK access, TypeScript contracts, desktop integration, and the orchestration layer. Both runtimes share SQLite for durability.

| Concern | Owner |
| ---------------------------------- | ----------- |
| Provider process lifecycle | Elixir |
| Crash isolation + supervision | Elixir |
| Per-session memory containment | Elixir |
| Concern | Owner |
| ---------------------------------- | ------------- |
| Provider process lifecycle | Elixir |
| Crash isolation + supervision | Elixir |
| Per-session memory containment | Elixir |
| Session + event durability | Both (SQLite) |
| Pending request crash recovery | Elixir |
| Canonical event mapping (TS types) | Node |
| Claude Agent SDK | Node |
| Browser/Electron WebSocket | Node |
| Desktop + product integration | Node |
| Pending request crash recovery | Elixir |
| Canonical event mapping (TS types) | Node |
| Claude Agent SDK | Node |
| Browser/Electron WebSocket | Node |
| Desktop + product integration | Node |

---

Expand Down Expand Up @@ -140,28 +140,28 @@ python3 output/stress-test/viz-real.py

### Elixir harness (`apps/harness/`)

| Module | Role |
| ------------------ | ------------------------------------------------- |
| `SessionManager` | DynamicSupervisor routing, session lifecycle |
| `CodexSession` | Codex JSON-RPC GenServer |
| `CursorSession` | Cursor stream-json + tool mapping |
| `OpenCodeSession` | OpenCode HTTP+SSE + tool mapping |
| `ClaudeSession` | Claude CLI GenServer (stress tests only) |
| `MockSession` | Configurable mock for stress testing |
| `SnapshotServer` | In-memory event store + WAL replay + recovery |
| `Storage` | SQLite durability (sessions, events, pending reqs) |
| `Projector` | Pure event → snapshot projection |
| `ModelDiscovery` | CLI-based model listing with ETS cache |
| `HarnessChannel` | Phoenix Channel — single WS entry point |
| Module | Role |
| ----------------- | -------------------------------------------------- |
| `SessionManager` | DynamicSupervisor routing, session lifecycle |
| `CodexSession` | Codex JSON-RPC GenServer |
| `CursorSession` | Cursor stream-json + tool mapping |
| `OpenCodeSession` | OpenCode HTTP+SSE + tool mapping |
| `ClaudeSession` | Claude CLI GenServer (stress tests only) |
| `MockSession` | Configurable mock for stress testing |
| `SnapshotServer` | In-memory event store + WAL replay + recovery |
| `Storage` | SQLite durability (sessions, events, pending reqs) |
| `Projector` | Pure event → snapshot projection |
| `ModelDiscovery` | CLI-based model listing with ETS cache |
| `HarnessChannel` | Phoenix Channel — single WS entry point |

### Node integration

| File | Role |
| ------------------------- | ----------------------------------------------------------- |
| `HarnessClientAdapter.ts` | Single adapter: all 13 `ProviderAdapterShape` methods |
| `HarnessClientManager.ts` | Phoenix Channel WS client with reconnection |
| `codexEventMapping.ts` | Canonical event mapping (shared with existing CodexAdapter) |
| `ClaudeAdapter.ts` | Claude Agent SDK adapter (Node-native, not via harness) |
| `codexEventMapping.ts` | Canonical event mapping (shared with existing CodexAdapter) |
| `ClaudeAdapter.ts` | Claude Agent SDK adapter (Node-native, not via harness) |

---

Expand Down
18 changes: 18 additions & 0 deletions apps/harness/lib/harness/providers/codex_session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -739,13 +739,15 @@ defmodule Harness.Providers.CodexSession do
{%{from: nil, method: "thread/start"}, pending} ->
codex_id = get_in(result, ["thread", "id"])
state = %{state | pending: pending, codex_thread_id: codex_id, ready: true}
persist_binding(state)
Enum.each(state.ready_waiters, &GenServer.reply(&1, :ok))
%{state | ready_waiters: []}

{%{from: nil, method: "thread/resume"}, pending} ->
# Resume succeeded — same handling as thread/start
codex_id = get_in(result, ["thread", "id"])
state = %{state | pending: pending, codex_thread_id: codex_id, ready: true}
persist_binding(state)
Enum.each(state.ready_waiters, &GenServer.reply(&1, :ok))
%{state | ready_waiters: []}

Expand All @@ -766,6 +768,13 @@ defmodule Harness.Providers.CodexSession do
message = Map.get(error, "message", "Unknown error")
state = %{state | pending: pending}

# Delete stale binding before any retry — prevents livelock if
# the cursor is permanently invalid. This runs for both recoverable
# and non-recoverable failures intentionally: if resume failed, the
# cursor is suspect. The recoverable path falls back to thread/start
# which upserts a fresh binding on success.
Harness.Storage.delete_binding(state.thread_id)

if is_recoverable_resume_error?(message) do
Logger.info(
"Thread resume failed (recoverable): #{message}, falling back to thread/start"
Expand Down Expand Up @@ -998,6 +1007,15 @@ defmodule Harness.Providers.CodexSession do
Port.command(port, message <> "\n")
end

defp persist_binding(state) do
cursor_json =
if state.codex_thread_id do
Jason.encode!(%{"threadId" => state.codex_thread_id})
end

Harness.Storage.upsert_binding(state.thread_id, state.provider, cursor_json)
end

defp emit_event(state, kind, method, payload) do
event =
Event.new(%{
Expand Down
93 changes: 93 additions & 0 deletions apps/harness/lib/harness/storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,21 @@ defmodule Harness.Storage do
GenServer.call(__MODULE__, {:get_pending_requests, thread_id})
end

@doc "Upsert a binding (resume cursor) for a thread. Opaque pass-through — harness never parses resume_cursor_json."
def upsert_binding(thread_id, provider, resume_cursor_json) do
GenServer.call(__MODULE__, {:upsert_binding, thread_id, provider, resume_cursor_json})
end

@doc "Get the binding for a thread. Returns %{thread_id, provider, resume_cursor_json} or nil."
def get_binding(thread_id) do
GenServer.call(__MODULE__, {:get_binding, thread_id})
end

@doc "Delete a binding. Called on any resume failure (recoverable or not) before retry."
def delete_binding(thread_id) do
GenServer.call(__MODULE__, {:delete_binding, thread_id})
end

@doc "Truncate all tables. Test-only."
def reset! do
GenServer.call(__MODULE__, :reset)
Expand Down Expand Up @@ -131,7 +146,23 @@ defmodule Harness.Storage do
{:reply, result, state}
end

def handle_call({:upsert_binding, thread_id, provider, resume_cursor_json}, _from, %{conn: conn} = state) do
result = do_upsert_binding(conn, thread_id, provider, resume_cursor_json)
{:reply, result, state}
end

def handle_call({:get_binding, thread_id}, _from, %{conn: conn} = state) do
result = do_get_binding(conn, thread_id)
{:reply, result, state}
end

def handle_call({:delete_binding, thread_id}, _from, %{conn: conn} = state) do
result = do_delete_binding(conn, thread_id)
{:reply, result, state}
end

def handle_call(:reset, _from, %{conn: conn} = state) do
Sqlite3.execute(conn, "DELETE FROM harness_bindings")
Sqlite3.execute(conn, "DELETE FROM harness_pending_requests")
Sqlite3.execute(conn, "DELETE FROM harness_events")
Sqlite3.execute(conn, "DELETE FROM harness_sessions")
Expand Down Expand Up @@ -230,6 +261,21 @@ defmodule Harness.Storage do
CREATE INDEX IF NOT EXISTS idx_pending_reqs_thread
ON harness_pending_requests(thread_id)
""")

# Bindings: provider-specific resume cursors that survive session close.
# Rows are UPSERTed on thread/start or thread/resume success.
# Deleted on any resume failure (recoverable or not) before retry.
# Expected provider values: "codex", "cursor", "opencode"
# (Claude uses Node SDK directly, not the harness.)
Sqlite3.execute(conn, """
CREATE TABLE IF NOT EXISTS harness_bindings (
thread_id TEXT PRIMARY KEY,
provider TEXT NOT NULL,
Comment on lines +271 to +273
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Binding keying currently breaks multi-provider coexistence.

With thread_id as the sole key (Line 272) and lookup/delete keyed only by thread_id (Lines 474-485), one provider’s upsert will overwrite another provider’s cursor for the same thread. That conflicts with the PR objective to support multi-provider coexistence.

💡 Proposed fix (provider-scoped bindings)
-  `@doc` "Get the binding for a thread. Returns %{thread_id, provider, resume_cursor_json} or nil."
-  def get_binding(thread_id) do
-    GenServer.call(__MODULE__, {:get_binding, thread_id})
+  `@doc` "Get the binding for a thread+provider. Returns %{thread_id, provider, resume_cursor_json} or nil."
+  def get_binding(thread_id, provider) do
+    GenServer.call(__MODULE__, {:get_binding, thread_id, provider})
   end

-  `@doc` "Delete a binding. Called when resume fails non-recoverably (before fresh start)."
-  def delete_binding(thread_id) do
-    GenServer.call(__MODULE__, {:delete_binding, thread_id})
+  `@doc` "Delete a binding for a thread+provider."
+  def delete_binding(thread_id, provider) do
+    GenServer.call(__MODULE__, {:delete_binding, thread_id, provider})
   end
-  def handle_call({:get_binding, thread_id}, _from, %{conn: conn} = state) do
-    result = do_get_binding(conn, thread_id)
+  def handle_call({:get_binding, thread_id, provider}, _from, %{conn: conn} = state) do
+    result = do_get_binding(conn, thread_id, provider)
     {:reply, result, state}
   end

-  def handle_call({:delete_binding, thread_id}, _from, %{conn: conn} = state) do
-    result = do_delete_binding(conn, thread_id)
+  def handle_call({:delete_binding, thread_id, provider}, _from, %{conn: conn} = state) do
+    result = do_delete_binding(conn, thread_id, provider)
     {:reply, result, state}
   end
-    CREATE TABLE IF NOT EXISTS harness_bindings (
-      thread_id TEXT PRIMARY KEY,
+    CREATE TABLE IF NOT EXISTS harness_bindings (
+      thread_id TEXT NOT NULL,
       provider TEXT NOT NULL,
       resume_cursor_json TEXT,
       created_at TEXT NOT NULL,
-      updated_at TEXT NOT NULL
+      updated_at TEXT NOT NULL,
+      PRIMARY KEY (thread_id, provider)
     )
-    ON CONFLICT(thread_id) DO UPDATE SET
+    ON CONFLICT(thread_id, provider) DO UPDATE SET
       provider = excluded.provider,
       resume_cursor_json = excluded.resume_cursor_json,
       updated_at = excluded.updated_at
-  defp do_get_binding(conn, thread_id) do
-    sql = "SELECT thread_id, provider, resume_cursor_json FROM harness_bindings WHERE thread_id = ?1"
+  defp do_get_binding(conn, thread_id, provider) do
+    sql = """
+    SELECT thread_id, provider, resume_cursor_json
+    FROM harness_bindings
+    WHERE thread_id = ?1 AND provider = ?2
+    """
-    case query_one(conn, sql, [thread_id]) do
+    case query_one(conn, sql, [thread_id, provider]) do
       [tid, provider, cursor_json] -> %{thread_id: tid, provider: provider, resume_cursor_json: cursor_json}
       nil -> nil
     end
   end

-  defp do_delete_binding(conn, thread_id) do
-    sql = "DELETE FROM harness_bindings WHERE thread_id = ?1"
+  defp do_delete_binding(conn, thread_id, provider) do
+    sql = "DELETE FROM harness_bindings WHERE thread_id = ?1 AND provider = ?2"
...
-      :ok = Sqlite3.bind(stmt, [thread_id])
+      :ok = Sqlite3.bind(stmt, [thread_id, provider])

Also applies to: 451-460, 474-485

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/harness/lib/harness/storage.ex` around lines 271 - 273, The
harness_bindings table and binding operations currently use thread_id as the
sole primary key so one provider can overwrite another; update the schema to use
a composite primary key (thread_id, provider) and adjust all binding operations
(the upsert path and the lookup/delete paths referenced around lines 451-460 and
474-485) to include provider in WHERE/DELETE/INSERT/UPSERT logic so bindings are
provider-scoped; ensure the column names (thread_id, provider) and table name
(harness_bindings) are used consistently and any existing queries/functions that
fetch or remove bindings accept and pass the provider parameter.

resume_cursor_json TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
end

# --- Event operations ---
Expand Down Expand Up @@ -400,6 +446,53 @@ defmodule Harness.Storage do
|> Enum.map(&row_to_pending_request/1)
end

# --- Binding operations ---

defp do_upsert_binding(conn, thread_id, provider, resume_cursor_json) do
now = DateTime.utc_now() |> DateTime.to_iso8601()

sql = """
INSERT INTO harness_bindings (thread_id, provider, resume_cursor_json, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5)
ON CONFLICT(thread_id) DO UPDATE SET
provider = excluded.provider,
resume_cursor_json = excluded.resume_cursor_json,
updated_at = excluded.updated_at
"""

{:ok, stmt} = Sqlite3.prepare(conn, sql)

try do
:ok = Sqlite3.bind(stmt, [thread_id, provider, resume_cursor_json, now, now])
:done = Sqlite3.step(conn, stmt)
:ok
after
Sqlite3.release(conn, stmt)
end
end

defp do_get_binding(conn, thread_id) do
sql = "SELECT thread_id, provider, resume_cursor_json FROM harness_bindings WHERE thread_id = ?1"

case query_one(conn, sql, [thread_id]) do
[tid, provider, cursor_json] -> %{thread_id: tid, provider: provider, resume_cursor_json: cursor_json}
nil -> nil
end
end

defp do_delete_binding(conn, thread_id) do
sql = "DELETE FROM harness_bindings WHERE thread_id = ?1"
{:ok, stmt} = Sqlite3.prepare(conn, sql)

try do
:ok = Sqlite3.bind(stmt, [thread_id])
:done = Sqlite3.step(conn, stmt)
:ok
after
Sqlite3.release(conn, stmt)
end
end

# --- Session queries ---

defp do_get_all_sessions(conn) do
Expand Down
63 changes: 63 additions & 0 deletions apps/harness/test/harness/storage_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,69 @@ defmodule Harness.StorageTest do
assert length(Storage.get_pending_requests()) == 0
end

# --- Binding tests ---

test "upsert_binding creates and retrieves a binding" do
cursor = Jason.encode!(%{"threadId" => "codex-thread-abc"})
:ok = Storage.upsert_binding("t1", "codex", cursor)

binding = Storage.get_binding("t1")
assert binding.thread_id == "t1"
assert binding.provider == "codex"
assert binding.resume_cursor_json == cursor
end

test "upsert_binding updates existing binding" do
cursor1 = Jason.encode!(%{"threadId" => "old-thread"})
cursor2 = Jason.encode!(%{"threadId" => "new-thread"})

:ok = Storage.upsert_binding("t1", "codex", cursor1)
:ok = Storage.upsert_binding("t1", "codex", cursor2)

binding = Storage.get_binding("t1")
assert binding.resume_cursor_json == cursor2
end

test "get_binding returns nil for unknown thread" do
assert Storage.get_binding("nonexistent") == nil
end

test "delete_binding removes the row" do
:ok = Storage.upsert_binding("t1", "codex", ~s({"threadId":"abc"}))
assert Storage.get_binding("t1") != nil

:ok = Storage.delete_binding("t1")
assert Storage.get_binding("t1") == nil
end

test "delete_binding is idempotent" do
:ok = Storage.delete_binding("nonexistent")
assert Storage.get_binding("nonexistent") == nil
end

test "upsert_binding with nil cursor" do
:ok = Storage.upsert_binding("t1", "codex", nil)
binding = Storage.get_binding("t1")
assert binding.thread_id == "t1"
assert binding.resume_cursor_json == nil
end

test "reset! clears bindings" do
:ok = Storage.upsert_binding("t1", "codex", ~s({"threadId":"abc"}))
Storage.reset!()
assert Storage.get_binding("t1") == nil
end

test "bindings for different providers coexist" do
:ok = Storage.upsert_binding("t1", "codex", ~s({"threadId":"codex-1"}))
:ok = Storage.upsert_binding("t2", "cursor", ~s({"cursorChatId":"cursor-1"}))
:ok = Storage.upsert_binding("t3", "opencode", ~s({"sessionId":"oc-1"}))

assert Storage.get_binding("t1").provider == "codex"
assert Storage.get_binding("t2").provider == "cursor"
assert Storage.get_binding("t3").provider == "opencode"
end

# --- Integration: SnapshotServer recovery ---

describe "SnapshotServer recovery" do
Expand Down
Loading