-
Notifications
You must be signed in to change notification settings - Fork 0
feat(harness): SQLite bindings table for thread resume #12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
8784724
daf6aab
4b79399
467af49
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -64,6 +64,21 @@ defmodule Harness.Storage do | |
| GenServer.call(__MODULE__, {:get_pending_requests, thread_id}) | ||
| end | ||
|
|
||
| @doc "Upsert a binding (resume cursor) for a thread. Opaque pass-through — harness never parses resume_cursor_json." | ||
| def upsert_binding(thread_id, provider, resume_cursor_json) do | ||
| GenServer.call(__MODULE__, {:upsert_binding, thread_id, provider, resume_cursor_json}) | ||
| end | ||
|
|
||
| @doc "Get the binding for a thread. Returns %{thread_id, provider, resume_cursor_json} or nil." | ||
| def get_binding(thread_id) do | ||
| GenServer.call(__MODULE__, {:get_binding, thread_id}) | ||
| end | ||
|
|
||
| @doc "Delete a binding. Called when resume fails non-recoverably (before fresh start)." | ||
| def delete_binding(thread_id) do | ||
| GenServer.call(__MODULE__, {:delete_binding, thread_id}) | ||
| end | ||
|
|
||
| @doc "Truncate all tables. Test-only." | ||
| def reset! do | ||
| GenServer.call(__MODULE__, :reset) | ||
|
|
@@ -131,7 +146,23 @@ defmodule Harness.Storage do | |
| {:reply, result, state} | ||
| end | ||
|
|
||
| def handle_call({:upsert_binding, thread_id, provider, resume_cursor_json}, _from, %{conn: conn} = state) do | ||
| result = do_upsert_binding(conn, thread_id, provider, resume_cursor_json) | ||
| {:reply, result, state} | ||
| end | ||
|
|
||
| def handle_call({:get_binding, thread_id}, _from, %{conn: conn} = state) do | ||
| result = do_get_binding(conn, thread_id) | ||
| {:reply, result, state} | ||
| end | ||
|
|
||
| def handle_call({:delete_binding, thread_id}, _from, %{conn: conn} = state) do | ||
| result = do_delete_binding(conn, thread_id) | ||
| {:reply, result, state} | ||
| end | ||
|
|
||
| def handle_call(:reset, _from, %{conn: conn} = state) do | ||
| Sqlite3.execute(conn, "DELETE FROM harness_bindings") | ||
| Sqlite3.execute(conn, "DELETE FROM harness_pending_requests") | ||
| Sqlite3.execute(conn, "DELETE FROM harness_events") | ||
| Sqlite3.execute(conn, "DELETE FROM harness_sessions") | ||
|
|
@@ -230,6 +261,21 @@ defmodule Harness.Storage do | |
| CREATE INDEX IF NOT EXISTS idx_pending_reqs_thread | ||
| ON harness_pending_requests(thread_id) | ||
| """) | ||
|
|
||
| # Bindings: provider-specific resume cursors that survive session close. | ||
| # Rows are UPSERTed on thread/start or thread/resume success. | ||
| # Deleted only on non-recoverable resume failure (before fresh start). | ||
| # Expected provider values: "codex", "cursor", "opencode" | ||
| # (Claude uses Node SDK directly, not the harness.) | ||
| Sqlite3.execute(conn, """ | ||
| CREATE TABLE IF NOT EXISTS harness_bindings ( | ||
| thread_id TEXT PRIMARY KEY, | ||
| provider TEXT NOT NULL, | ||
|
Comment on lines
+271
to
+273
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Binding keying currently breaks multi-provider coexistence. With 💡 Proposed fix (provider-scoped bindings)- `@doc` "Get the binding for a thread. Returns %{thread_id, provider, resume_cursor_json} or nil."
- def get_binding(thread_id) do
- GenServer.call(__MODULE__, {:get_binding, thread_id})
+ `@doc` "Get the binding for a thread+provider. Returns %{thread_id, provider, resume_cursor_json} or nil."
+ def get_binding(thread_id, provider) do
+ GenServer.call(__MODULE__, {:get_binding, thread_id, provider})
end
- `@doc` "Delete a binding. Called when resume fails non-recoverably (before fresh start)."
- def delete_binding(thread_id) do
- GenServer.call(__MODULE__, {:delete_binding, thread_id})
+ `@doc` "Delete a binding for a thread+provider."
+ def delete_binding(thread_id, provider) do
+ GenServer.call(__MODULE__, {:delete_binding, thread_id, provider})
end- def handle_call({:get_binding, thread_id}, _from, %{conn: conn} = state) do
- result = do_get_binding(conn, thread_id)
+ def handle_call({:get_binding, thread_id, provider}, _from, %{conn: conn} = state) do
+ result = do_get_binding(conn, thread_id, provider)
{:reply, result, state}
end
- def handle_call({:delete_binding, thread_id}, _from, %{conn: conn} = state) do
- result = do_delete_binding(conn, thread_id)
+ def handle_call({:delete_binding, thread_id, provider}, _from, %{conn: conn} = state) do
+ result = do_delete_binding(conn, thread_id, provider)
{:reply, result, state}
end- CREATE TABLE IF NOT EXISTS harness_bindings (
- thread_id TEXT PRIMARY KEY,
+ CREATE TABLE IF NOT EXISTS harness_bindings (
+ thread_id TEXT NOT NULL,
provider TEXT NOT NULL,
resume_cursor_json TEXT,
created_at TEXT NOT NULL,
- updated_at TEXT NOT NULL
+ updated_at TEXT NOT NULL,
+ PRIMARY KEY (thread_id, provider)
)- ON CONFLICT(thread_id) DO UPDATE SET
+ ON CONFLICT(thread_id, provider) DO UPDATE SET
provider = excluded.provider,
resume_cursor_json = excluded.resume_cursor_json,
updated_at = excluded.updated_at- defp do_get_binding(conn, thread_id) do
- sql = "SELECT thread_id, provider, resume_cursor_json FROM harness_bindings WHERE thread_id = ?1"
+ defp do_get_binding(conn, thread_id, provider) do
+ sql = """
+ SELECT thread_id, provider, resume_cursor_json
+ FROM harness_bindings
+ WHERE thread_id = ?1 AND provider = ?2
+ """
- case query_one(conn, sql, [thread_id]) do
+ case query_one(conn, sql, [thread_id, provider]) do
[tid, provider, cursor_json] -> %{thread_id: tid, provider: provider, resume_cursor_json: cursor_json}
nil -> nil
end
end
- defp do_delete_binding(conn, thread_id) do
- sql = "DELETE FROM harness_bindings WHERE thread_id = ?1"
+ defp do_delete_binding(conn, thread_id, provider) do
+ sql = "DELETE FROM harness_bindings WHERE thread_id = ?1 AND provider = ?2"
...
- :ok = Sqlite3.bind(stmt, [thread_id])
+ :ok = Sqlite3.bind(stmt, [thread_id, provider])Also applies to: 451-460, 474-485 🤖 Prompt for AI Agents |
||
| resume_cursor_json TEXT, | ||
| created_at TEXT NOT NULL, | ||
| updated_at TEXT NOT NULL | ||
| ) | ||
| """) | ||
| end | ||
|
|
||
| # --- Event operations --- | ||
|
|
@@ -400,6 +446,53 @@ defmodule Harness.Storage do | |
| |> Enum.map(&row_to_pending_request/1) | ||
| end | ||
|
|
||
| # --- Binding operations --- | ||
|
|
||
| defp do_upsert_binding(conn, thread_id, provider, resume_cursor_json) do | ||
| now = DateTime.utc_now() |> DateTime.to_iso8601() | ||
|
|
||
| sql = """ | ||
| INSERT INTO harness_bindings (thread_id, provider, resume_cursor_json, created_at, updated_at) | ||
| VALUES (?1, ?2, ?3, ?4, ?5) | ||
| ON CONFLICT(thread_id) DO UPDATE SET | ||
| provider = excluded.provider, | ||
| resume_cursor_json = excluded.resume_cursor_json, | ||
| updated_at = excluded.updated_at | ||
| """ | ||
|
|
||
| {:ok, stmt} = Sqlite3.prepare(conn, sql) | ||
|
|
||
| try do | ||
| :ok = Sqlite3.bind(stmt, [thread_id, provider, resume_cursor_json, now, now]) | ||
| :done = Sqlite3.step(conn, stmt) | ||
| :ok | ||
| after | ||
| Sqlite3.release(conn, stmt) | ||
| end | ||
| end | ||
|
|
||
| defp do_get_binding(conn, thread_id) do | ||
| sql = "SELECT thread_id, provider, resume_cursor_json FROM harness_bindings WHERE thread_id = ?1" | ||
|
|
||
| case query_one(conn, sql, [thread_id]) do | ||
| [tid, provider, cursor_json] -> %{thread_id: tid, provider: provider, resume_cursor_json: cursor_json} | ||
| nil -> nil | ||
| end | ||
| end | ||
|
|
||
| defp do_delete_binding(conn, thread_id) do | ||
| sql = "DELETE FROM harness_bindings WHERE thread_id = ?1" | ||
| {:ok, stmt} = Sqlite3.prepare(conn, sql) | ||
|
|
||
| try do | ||
| :ok = Sqlite3.bind(stmt, [thread_id]) | ||
| :done = Sqlite3.step(conn, stmt) | ||
| :ok | ||
| after | ||
| Sqlite3.release(conn, stmt) | ||
| end | ||
| end | ||
|
|
||
| # --- Session queries --- | ||
|
|
||
| defp do_get_all_sessions(conn) do | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.