feat: WebSocket session API for interactive agent steering#455
feat: WebSocket session API for interactive agent steering#455danieliser wants to merge 39 commits intoalibaba:mainfrom
Conversation
…t of dependency on global locks.
…ession-terminated error
- .github/workflows/ci.yml: matrix build/vet/test on Go 1.21 and 1.22, working-directory components/execd, race detector enabled (60s timeout) - FORK.md: documents fork purpose (PAOP WebSocket steering), upstream sync instructions, and which phases are upstream candidates vs. PAOP-only Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
A3: rename sentinel constants with nonce suffix to prevent collision
with user code printing those strings (corrupts env/exit parsing)
A4: set cmd.Dir = cwd before cmd.Start() so the bash process starts
in the correct OS-level working directory
A5: allow io.EOF in RunInSession bindJSON so empty POST body falls
through to validate() and returns a clear 'code is required' error
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
A1: Add closing bool field to bashSession struct; set s.closing = true in close() before reading pid so any concurrent trackCurrentProcess() call sees the flag and immediately SIGKILLs the new process group instead of storing a pid that will never be reaped. A2: Replace cmd.Stderr = cmd.Stdout (which merged stderr into the stdout pipe, silencing OnExecuteStderr) with two explicit io.Pipe() instances. A parallel goroutine drains stderrR and fires OnExecuteStderr per line; write ends are closed after cmd.Wait() to unblock the goroutine cleanly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Add replayBuffer (1 MiB circular) in pkg/runtime/replay_buffer.go - Wire replay.write() into stdout and stderr scanner goroutines in bashSession - Add GET /session/:id returning SessionStatusResponse with output_offset - Add ?since=<offset> support on POST /session/:id/run (SSE replay event) - Add ReplaySessionOutput and GetBashSessionStatus to runtime.Controller - Add StreamEventTypeReplay constant to SSE event model - 5 unit tests: write/read, circular eviction, caught-up offset, large gap, concurrent Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements bidirectional WebSocket steering for bash sessions: - New GET /ws/session/:sessionId endpoint (registered under /ws group) - pkg/web/model/session_ws.go: ClientFrame/ServerFrame types + WSErrCode constants - pkg/web/controller/session_ws.go: SessionWebSocket handler - HTTP 404 before upgrade for unknown sessions - Exclusive WS lock (HTTP 409 if already connected) - gorilla/websocket upgrade with CheckOrigin: allow all (container env) - Start() launches interactive bash if not already running - ?since=<offset> replay from replayBuffer before entering live stream - connected frame immediately after upgrade - Write-serialized stdout/stderr scanner goroutines → replay buffer + WS frames - Exit watcher: closes doneCh → exit frame with code - RFC 6455 ping/pong keepalive (30s interval, 60s read deadline) - Application-level ping/pong frame support - stdin/signal/resize frame handling - pkg/runtime/types.go: BashSession interface + WS fields on bashSession struct - pkg/runtime/bash_session.go: Start() (interactive bash with os.Pipe), LockWS/UnlockWS, IsRunning, ExitCode, StdoutPipe, StderrPipe, Done, SendSignal, signalByName, WriteSessionOutput on Controller - pkg/runtime/bash_session_windows.go: Windows stubs for new Controller methods - pkg/runtime/ctrl.go: GetBashSession returning BashSession interface - 5 tests: ConnectUnknownSession, PingPong, StdinForwarding, ReplayOnConnect, ExitFrame Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Add github.com/creack/pty dependency (upgraded from v1.1.9 to v1.1.24) - Add isPTY/ptmx fields to bashSession struct - Add StartPTY() method: launches bash via pty.StartWithSize, sets TERM=xterm-256color, COLUMNS=80, LINES=24 in env, merges stdout+stderr on ptmx; pipe mode (Start()) unchanged - Add ResizePTY(cols, rows) method: calls pty.Setsize; no-op if not PTY mode - Override WriteStdin() to write to ptmx in PTY mode, stdin pipe otherwise - Update close() to close ptmx on session teardown - Wire ?pty=1 query param in GET /ws/session/:sessionId WS handler: calls StartPTY() when pty=1, Start() otherwise; sends mode='pty'/'pipe' in connected frame; skips stderr goroutine in PTY mode; handles resize frames by calling ResizePTY() - Add BashSession interface methods: StartPTY() and ResizePTY() - Add 4 PTY tests: BasicExecution, ResizeUpdatesWinsize, AnsiSequencesPresent, PipeModeUnchanged Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
P1 (session_ws.go): Derive connected.mode from session.IsPTY() instead of the ?pty query parameter. Reconnecting with a mismatched ?pty value no longer reports the wrong mode or skips the stderr pump. P1 (bash_session.go): Reset isPTY and ptmx in Start() before assigning pipe descriptors. Reusing a session that previously ran in PTY mode no longer leaves WriteStdin targeting a stale PTY fd. P2 (session_ws.go): Set a 16 MiB scanner buffer on both stdout and stderr pumps (matching the run() scanner) and handle scanner.Err() explicitly. Long lines (JSON/base64 tool output) no longer silently terminate the pump. On scan error a RUNTIME_ERROR frame is sent to the client. Also adds IsPTY() to the BashSession interface and WSErrCodeRuntimeError to the model constants. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 51128dc725
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
P1 (bash_session.go): Separate wsPid from currentProcessPid. The long-lived WS shell (Start/StartPTY) now tracks its PID in wsPid; run() continues to use currentProcessPid. Prevents concurrent run() calls from clobbering WS shell tracking, keeping IsRunning, close, SendSignal, and interrupt logic correct. P1 (session_ws.go + types): Add CloseOutputPipes() to BashSession interface. Handler calls it via defer-cancel so stdout/stderr scanner goroutines unblock immediately on disconnect rather than blocking on the shared pipe until the next line of output arrives. Prevents goroutine leaks and nondeterministic output loss on reconnect. P2 (bash_session.go): Add defer os.Remove(scriptPath) in run() so temporary wrapper scripts are cleaned up after each execution instead of accumulating in /tmp. P2 (session_ws.go): Gate resize frame handling on session.IsPTY() instead of the request ?pty query flag, consistent with mode/stderr decisions from the previous round. Also: clear isPTY/ptmx in the StartPTY exit goroutine so a subsequent Start() in pipe mode finds a clean slate without needing to wait for the handler to race the reset. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
commandSnapshot copies *commandKernel without any mutex, but markCommandFinished still writes the same struct fields under c.mu; polling status while a command is finishing can therefore race on running/exitCode/finishedAt. This can produce inconsistent status responses and introduces a data race that wasn't present when snapshotting was guarded by the RW lock.
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
Revert CloseOutputPipes approach: closing the session's real stdout/stderr pipe readers on WS disconnect broke reconnects — the shell stays alive with wsPid set, so Start() is skipped, but the new handler is attached to dead descriptors causing SIGPIPE/HUP and silent output loss. Scanner goroutines already exit naturally when writeJSON fails after the connection drops; the defer cancel() is sufficient. Fix commandSnapshot data race: commandSnapshot was copying *kernel without holding c.mu while markCommandFinished writes the same struct fields under c.mu (write lock). Concurrent status polls during command exit could observe a torn struct. commandSnapshot now takes c.mu.RLock() around the copy, consistent with the existing write-lock discipline in markCommandFinished. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: cd3ccafd6e
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
…connect Root problem: scanner goroutines block in Scan() on the shared OS pipe. Closing the WS connection cancels ctx but doesn't unblock Scan(), so goroutines outlived UnlockWS() — a reconnecting client could start new scanners while stale ones from the prior connection were still reading. Solution — output broadcast pattern: - Start/StartPTY now launch a broadcast goroutine that reads the real OS pipe and writes into a swappable *io.PipeWriter stored on the session. - AttachOutput() (new BashSession method) creates a fresh io.Pipe pair per connection, swaps in the PipeWriter, and returns the PipeReader plus a detach() func that closes the PipeWriter. - The WS handler defers: cancel() → detach() → pumpWg.Wait() → UnlockWS() in that order. detach() closes the PipeWriter (EOF to the scanner), pumpWg.Wait() blocks until scanner goroutines exit, then UnlockWS() fires — guaranteeing no stale goroutine is reading when a new client arrives. Also fixes GetBashSessionStatus to derive Running from wsPid != 0 instead of session.started, so the field accurately reflects whether the WS shell is currently alive rather than whether it was ever started. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 2f161bc466
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
P0 (bash_session.go run()): Stdout scanner blocked waiting for EOF on stdoutR, which required stdoutW to be closed, which only happened after cmd.Wait(), which itself was waiting for stdout to be drained — a deadlock. Fix: close stdoutW and stderrW immediately after cmd.Start() so the process's output buffers are already closed in the parent before scanning begins. Wait() is then called after scanning without blocking. P1 (session_ws.go): WS lock (wsConnected) was not released when the WebSocket upgrade handshake failed. Any subsequent connection attempt returned "session already connected" until process restart. Fix: call session.UnlockWS() in the upgrade error path. P1 (bash_session.go broadcast goroutines): Output produced while no client is attached (between disconnect and reconnect) was discarded — the broadcast goroutine only wrote to the per-connection PipeWriter and skipped the replay buffer. Reconnecting clients using ?since= would miss those bytes entirely. Fix: always write to s.replay before fanout, independent of whether a PipeWriter is active. Handler pumps no longer call WriteSessionOutput (replay is now the broadcast goroutine's job). Also updates bash_session_pty_test.go to use AttachOutput() after the removal of the StdoutPipe/StderrPipe interface methods. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 0bb16a7308
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
P0 (bash_session.go run()): Previous fix closed io.PipeWriters immediately after cmd.Start(), but exec's internal copy goroutines still write into those writers after Start() returns — closing them early caused "write on closed pipe" errors and broken output. Switch from io.Pipe to os.Pipe: the kernel owns the buffer, so closing the parent-side write fd after Start() is safe — the child holds its own fd copy and the reader sees EOF only when the child exits. Eliminates the scan↔Wait deadlock cleanly without breaking writes. P1 (session_ws.go): WS lock (wsConnected) was not released when Start()/StartPTY() returned an error. The deferred cleanup block is installed later, so the early return bypassed UnlockWS(). Fixed by calling session.UnlockWS() explicitly before the error return. P2 (replay_buffer.go): GetBashSessionStatus read session.replay.total directly without holding the replay mutex, creating a data race with concurrent write() calls. Added replayBuffer.Total() accessor (mutex-guarded) and updated the status call to use it. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: c4214b1f55
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
bufio.Scanner buffers until newline, which delays or drops PTY prompts, progress spinners, and cursor-control sequences that never emit \n. Replace stdout/stderr scanners with raw 32 KiB chunk reads via io.Read so all output is forwarded immediately without line-boundary buffering. Also fix TestPTY_PipeModeUnchanged: poll replay buffer instead of blocking on PipeReader, which was racy when output arrived before AttachOutput installed the PipeWriter. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: de472b6a01
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
- RunInSession: map ErrContextNotFound → HTTP 404 so clients can distinguish missing sessions from server errors and apply correct retry/recreate logic - SessionWebSocket exit watcher: send CloseMessage + conn.Close() after the exit frame so ReadJSON unblocks immediately instead of waiting up to 60s for the read deadline; eliminates the 'session already connected' false positive window during reconnect after process exit Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d9c68bb741
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
…errors - bash_session.go: close stdoutR/stderrR in broadcast goroutines (pipe mode) and stdoutR/stderrR in run() after consumers finish — every interactive session and every RunInBashSession call was leaking two OS file descriptors until GC - session_ws.go: move AttachOutput() before ReplaySessionOutput() so the live pipe sink is installed before the replay snapshot is taken; eliminates the window where bytes produced between snapshot and attach were delivered to neither the replay frame nor the live stream - session_ws.go: report PTY resize errors to the client as WS error frames instead of silently discarding them - codeinterpreting.go: validate session existence via ReplaySessionOutput before calling setupSSEResponse(); prevents HTTP 200 being committed before we know whether the session exists, which made 404 unreachable Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 036c508c65
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| case c.getBashSession(sessionID) != nil: | ||
| return c.closeBashSession(sessionID) |
There was a problem hiding this comment.
Keep bash session alive when handling Interrupt
This branch makes Interrupt call closeBashSession, which deletes the bash session from the controller map instead of only stopping the running process. In practice, calling /code interrupt against a bash session now invalidates the session ID entirely, so subsequent /session/:id/run or /ws/session/:id calls fail with not found rather than allowing continued use of the same session (unlike Jupyter/command interrupt behavior).
Useful? React with 👍 / 👎.
| stdout, stderr, detach := session.AttachOutput() | ||
| var pumpWg sync.WaitGroup |
There was a problem hiding this comment.
Prevent replay reconnect from emitting duplicate output
AttachOutput is installed before replay is read, but the stdout/stderr pumps are only started later, so output produced during this window is both included in the replay payload and then delivered again from the attached live pipe. Reconnecting clients using ?since= against a still-running noisy process will therefore see duplicated chunks, which can corrupt accumulated terminal/output state.
Useful? React with 👍 / 👎.
components/execd/pkg/runtime/ctrl.go
Outdated
| // GetBashSession retrieves a bash session by ID. Returns nil if not found. | ||
| func (c *Controller) GetBashSession(sessionID string) BashSession { | ||
| s := c.getBashSession(sessionID) |
There was a problem hiding this comment.
Gate GetBashSession to non-Windows builds
This method is compiled on all platforms and calls the non-Windows helper getBashSession, while bash_session_windows.go also defines GetBashSession. On GOOS=windows, the package now has conflicting GetBashSession methods (and no non-Windows helper), so Windows builds fail to compile.
Useful? React with 👍 / 👎.
- Move GetBashSession from ctrl.go (no build tag) to bash_session.go (!windows build tag) to eliminate the duplicate method definition that breaks Windows compilation - Fix replay/attach ordering: snapshot replay buffer BEFORE AttachOutput so bytes produced between snapshot and attach land in the live pipe only; previously AttachOutput was called first, causing those bytes to appear in both the replay frame and the live stream (duplicate output) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 6d32a7b108
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| replayData, replayNextOffset, _ = codeRunner.ReplaySessionOutput(sessionID, since) | ||
| } | ||
| } | ||
|
|
||
| stdout, stderr, detach := session.AttachOutput() |
There was a problem hiding this comment.
Attach output pipes before reading replay backlog
On reconnects with ?since=..., this code snapshots replay bytes before installing the live output sink, which creates a loss window for active sessions: bashSession broadcasters always write to the replay ring first and only forward to stdoutW/stderrW when those writers are attached, so bytes emitted between the replay snapshot and AttachOutput() are neither included in the replay frame nor delivered live to this connection. In workloads that continuously stream output, clients can silently miss chunks during reconnect.
Useful? React with 👍 / 👎.
Summary
Adds a WebSocket endpoint to execd bash sessions, enabling interactive steering of long-running processes (e.g. Claude Code agents) without killing and restarting them.
?since=parameter returns buffered output so clients can reconnect without losing historyGET /ws/session/:sessionIdWebSocket endpoint: bidirectional stdin/stdout framing over an existing bash session; pipe mode and PTY mode?pty=1on session creation: full terminal emulation for interactive programs; graceful degradation to pipe mode on platforms where PTY is unavailable (e.g. macOS unsigned binaries)Also includes earlier fixes merged from upstream:
trackCurrentProcesscmd.Dir, empty body guardAPI additions
Frame protocol:
{"type":"connected","mode":"pipe|pty"},{"type":"stdout","data":"..."},{"type":"exit","exit_code":0}{"type":"stdin","data":"..."}GET /ws/session/:id?since=<bytes_written>→{"type":"replay","offset":N}then{"type":"connected",...}Test plan
bash_session_test.go— pipe mode unit tests (session lifecycle, output capture, stdin injection)bash_session_pty_test.go— PTY mode unit tests (skipped on macOS unsigned binaries)session_ws_test.go— WebSocket controller unit tests (connected frame, stdin delivery, exit detection, replay)replay_buffer_test.go— replay buffer offset correctnessNotes
POST /commandendpoint (existing) remains the recommended path for fire-and-forget agent dispatch. WebSocket is for mid-execution steering only.🤖 Generated with Claude Code