Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions apps/harness/lib/harness/session_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ defmodule Harness.SessionManager do
end

defp start_session_with_module(session_module, thread_id, provider, params) do
params = maybe_inject_resume_cursor(params, thread_id, provider)

child_spec =
{session_module,
%{
Expand Down Expand Up @@ -258,6 +260,31 @@ defmodule Harness.SessionManager do
SnapshotServer.apply_event(event)
end

@doc false
# Inject resumeCursor from SQLite binding if one exists and the provider matches.
# If the caller already supplied a resumeCursor, respect it (don't overwrite).
def maybe_inject_resume_cursor(%{"resumeCursor" => _} = params, _thread_id, _provider) do
params
end

def maybe_inject_resume_cursor(params, thread_id, provider) do
case Harness.Storage.get_binding(thread_id) do
%{provider: ^provider, resume_cursor_json: cursor_json} when is_binary(cursor_json) ->
case Jason.decode(cursor_json) do
{:ok, cursor} ->
Logger.info("Injecting resumeCursor from binding for #{thread_id} (#{provider})")
Map.put(params, "resumeCursor", cursor)

_ ->
params
end

_ ->
# No binding, provider mismatch, or nil cursor — start fresh
params
end
end

defp provider_module("codex"), do: {:ok, CodexSession}
defp provider_module("claudeAgent"), do: {:ok, ClaudeSession}
defp provider_module("opencode"), do: {:ok, OpenCodeSession}
Expand Down
77 changes: 77 additions & 0 deletions apps/harness/test/harness/storage_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,83 @@ defmodule Harness.StorageTest do
assert Storage.get_binding("t3").provider == "opencode"
end

# --- Resume cursor injection tests (SessionManager.maybe_inject_resume_cursor) ---

alias Harness.SessionManager

test "injects resumeCursor when binding exists and provider matches" do
cursor = Jason.encode!(%{"threadId" => "codex-abc"})
:ok = Storage.upsert_binding("t1", "codex", cursor)

params = %{"threadId" => "t1", "provider" => "codex"}
result = SessionManager.maybe_inject_resume_cursor(params, "t1", "codex")

assert result["resumeCursor"] == %{"threadId" => "codex-abc"}
end

test "does NOT inject resumeCursor when provider mismatches" do
cursor = Jason.encode!(%{"threadId" => "codex-abc"})
:ok = Storage.upsert_binding("t1", "codex", cursor)

params = %{"threadId" => "t1", "provider" => "cursor"}
result = SessionManager.maybe_inject_resume_cursor(params, "t1", "cursor")

refute Map.has_key?(result, "resumeCursor")
end

test "does NOT inject resumeCursor when no binding exists" do
params = %{"threadId" => "t1", "provider" => "codex"}
result = SessionManager.maybe_inject_resume_cursor(params, "t1", "codex")

refute Map.has_key?(result, "resumeCursor")
end

test "does NOT overwrite caller-supplied resumeCursor" do
cursor = Jason.encode!(%{"threadId" => "codex-abc"})
:ok = Storage.upsert_binding("t1", "codex", cursor)

params = %{"threadId" => "t1", "provider" => "codex", "resumeCursor" => %{"threadId" => "explicit"}}
result = SessionManager.maybe_inject_resume_cursor(params, "t1", "codex")

assert result["resumeCursor"] == %{"threadId" => "explicit"}
end

test "does NOT inject resumeCursor when cursor_json is nil" do
:ok = Storage.upsert_binding("t1", "codex", nil)

params = %{"threadId" => "t1", "provider" => "codex"}
result = SessionManager.maybe_inject_resume_cursor(params, "t1", "codex")

refute Map.has_key?(result, "resumeCursor")
end

test "multi-session: concurrent bindings for different threads don't cross-contaminate" do
:ok = Storage.upsert_binding("t1", "codex", Jason.encode!(%{"threadId" => "codex-1"}))
:ok = Storage.upsert_binding("t2", "cursor", Jason.encode!(%{"cursorChatId" => "cursor-1"}))
:ok = Storage.upsert_binding("t3", "opencode", Jason.encode!(%{"sessionId" => "oc-1"}))

r1 = SessionManager.maybe_inject_resume_cursor(%{}, "t1", "codex")
r2 = SessionManager.maybe_inject_resume_cursor(%{}, "t2", "cursor")
r3 = SessionManager.maybe_inject_resume_cursor(%{}, "t3", "opencode")

assert r1["resumeCursor"] == %{"threadId" => "codex-1"}
assert r2["resumeCursor"] == %{"cursorChatId" => "cursor-1"}
assert r3["resumeCursor"] == %{"sessionId" => "oc-1"}
end

test "multi-session: same thread_id with different provider ignores stale binding" do
# Codex wrote a binding for t1
:ok = Storage.upsert_binding("t1", "codex", Jason.encode!(%{"threadId" => "codex-abc"}))

# Cursor tries to start on same thread — should NOT get Codex's cursor
result = SessionManager.maybe_inject_resume_cursor(%{}, "t1", "cursor")
refute Map.has_key?(result, "resumeCursor")

# Codex should still get its own cursor
result = SessionManager.maybe_inject_resume_cursor(%{}, "t1", "codex")
assert result["resumeCursor"] == %{"threadId" => "codex-abc"}
end

# --- Integration: SnapshotServer recovery ---

describe "SnapshotServer recovery" do
Expand Down
13 changes: 9 additions & 4 deletions apps/server/src/provider/Layers/HarnessClientAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -917,10 +917,15 @@ function mapHarnessEventToRuntimeEvents(
return providerMapped;
}

// Silently drop unrecognised events. Provider-specific notifications that
// pass through the GenServer (e.g. Codex app-server lifecycle events) are
// not bugs — they're simply events with no canonical mapping. The raw event
// stream still contains them for debugging.
// Emit a warning for truly unrecognised events so operators can surface
// mapping gaps. Known-quiet events (already handled by codexEventMapping's
// QUIET_UNMAPPED_EVENTS) won't reach here — they return [] from the
// codexMapToRuntimeEvents fallback above.
if (event.method) {
console.debug(
`[mapHarnessEventToRuntimeEvents] unmapped event: ${event.method} (provider: ${event.provider ?? "unknown"})`,
);
}
return [];
}

Expand Down
2 changes: 2 additions & 0 deletions apps/server/src/provider/Layers/codexEventMapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,8 @@ const QUIET_UNMAPPED_EVENTS = new Set([
"hook_response",
"user",
"thinking",
// Raw equivalents of item/agentMessage/delta (already mapped).
// Mapping these would cause duplicate deltas in the UI.
"codex/event/agent_message_delta",
"codex/event/agent_message",
"codex/event/agent_message_content_delta",
Expand Down