Add surface.stream command for real-time PTY streaming#2612
Add surface.stream command for real-time PTY streaming#2612mahmudulturan wants to merge 2 commits intomanaflow-ai:mainfrom
Conversation
Add a new socket command that takes over the connection for bidirectional
PTY relay. The client sends {"method":"surface.stream"} and receives
base64-encoded terminal output frames while being able to inject input
back into the PTY. This is the cmux-side foundation for remote terminal
streaming to the mobile app.
- Detect surface.stream in handleClient before normal dispatch
- Resolve surface via the standard v2 pattern (workspace/surface_id)
- enterStreamRelay: poll-based loop at ~60fps for PTY tap read/write
- handleStreamInput: decode base64 input frames and write to PTY
- Register in processV2Command switch and capabilities list
|
@mahmudulturan is attempting to deploy a commit to the Manaflow Team on Vercel. A member of the Team first needs to authorize it. |
📝 WalkthroughWalkthroughA new streaming protocol ( Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Socket Client
participant TermCtrl as TerminalController
participant Surface as Terminal Surface
participant PTY as PTY Device
Client->>TermCtrl: Send surface.stream JSON command
TermCtrl->>TermCtrl: Intercept & parse stream request
TermCtrl->>Surface: Resolve target terminal
TermCtrl->>Client: Send stream-start acknowledgement
TermCtrl->>PTY: Open PTY tap
TermCtrl->>PTY: Read initial screen snapshot
TermCtrl->>Client: Send viewport text as JSON frame
loop Bidirectional Relay
par PTY to Client
PTY->>TermCtrl: Poll for PTY output
TermCtrl->>TermCtrl: Base64 encode output
TermCtrl->>Client: Send output frame (JSON)
and Client to PTY
Client->>TermCtrl: Send input/resize JSON frame
TermCtrl->>TermCtrl: Decode input bytes
TermCtrl->>PTY: Write bytes via pty_write
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Greptile SummaryThis PR adds a
Confidence Score: 4/5Not safe to merge: readScreenSnapshot calls ghostty_surface_read_text off the main thread, which is a data race / potential crash against the existing threading contract. Two P1 defects remain: off-main-thread ghostty API access (crash risk) and unbounded inputBuffer (OOM risk). Neither is speculative — the threading violation is directly observable by comparing against v2SurfaceReadText which uses v2MainSync for the same API. Sources/TerminalController.swift — specifically enterStreamRelay (readScreenSnapshot threading, inputBuffer cap) and the unchecked fcntl return. Important Files Changed
Sequence DiagramsequenceDiagram
participant C as Client
participant SC as handleClient (bg thread)
participant M as Main Thread
participant G as Ghostty PTY
C->>SC: {"method":"surface.stream",...}
SC->>M: v2MainSync: resolve surface ptr
M-->>SC: surfacePtr
SC->>SC: writeSocketResponse(stream:true)
SC->>SC: enterStreamRelay(socket, surface)
SC->>G: ghostty_surface_pty_tap_open()
Note over SC: readScreenSnapshot() ⚠️ called off-main
SC->>G: ghostty_surface_read_text() [off-main!]
G-->>SC: screen text
SC->>C: {"type":"snapshot","data":"..."}
loop ~60 fps relay
SC->>G: ghostty_surface_pty_tap_read()
G-->>SC: PTY bytes (n>0)
SC->>C: {"type":"output","data":"<b64>"}
C->>SC: {"type":"input","data":"<b64>"}
SC->>G: ghostty_surface_pty_write(bytes)
end
C--xSC: disconnect (POLLHUP/read<=0)
SC->>G: ghostty_surface_pty_tap_close()
SC->>SC: restore socket flags
Reviews (1): Last reviewed commit: "feat: send initial screen snapshot befor..." | Re-trigger Greptile |
| if let snapshot = readScreenSnapshot(surface: surface) { | ||
| let snapshotData = Data(snapshot.utf8) | ||
| let b64 = snapshotData.base64EncodedString() | ||
| let frame = "{\"type\":\"snapshot\",\"data\":\"\(b64)\"}\n" | ||
| frame.withCString { ptr in | ||
| _ = write(socket, ptr, strlen(ptr)) | ||
| } | ||
| } |
There was a problem hiding this comment.
readScreenSnapshot called off the main thread
ghostty_surface_read_text is invoked here from enterStreamRelay, which is nonisolated and runs on the background client thread. Every other call site for this API (e.g., readTerminalTextBase64 called inside v2MainSync {} in v2SurfaceReadText) dispatches to the main thread first. Calling it off-main risks a data race or crash in Ghostty's terminal state machine.
Fix: wrap the call in DispatchQueue.main.sync (mirroring v2MainSync) before entering the relay loop, or restructure so the snapshot is read inside the existing v2MainSync block in v2SurfaceStream and passed in as a parameter.
| var inBuf = [UInt8](repeating: 0, count: 4096) | ||
| let bytesRead = read(socket, &inBuf, inBuf.count) | ||
| if bytesRead <= 0 { break } | ||
|
|
||
| if let str = String(bytes: inBuf[0..<bytesRead], encoding: .utf8) { | ||
| inputBuffer += str | ||
| while let newlineIdx = inputBuffer.firstIndex(of: "\n") { | ||
| let line = String(inputBuffer[..<newlineIdx]) | ||
| inputBuffer = String(inputBuffer[inputBuffer.index(after: newlineIdx)...]) | ||
| handleStreamInput(line, surface: surface) | ||
| } | ||
| } |
There was a problem hiding this comment.
inputBuffer accumulates raw bytes from the socket and is only drained when a \n delimiter is found. A client that sends a large payload (or a corrupt/malicious stream) without a newline will grow this buffer without bound until the connection closes or the process OOMs. Add a hard cap and break the loop when it is exceeded:
inputBuffer += str
if inputBuffer.count > 1_048_576 { break } // 1 MB safety cap
while let newlineIdx = inputBuffer.firstIndex(of: "\n") {| let origFlags = fcntl(socket, F_GETFL, 0) | ||
| fcntl(socket, F_SETFL, origFlags | O_NONBLOCK) |
There was a problem hiding this comment.
fcntl(socket, F_GETFL, 0) returns -1 on failure. Applying F_SETFL with -1 | O_NONBLOCK sets all flag bits, which is undefined behaviour. The return value should be validated before use:
| let origFlags = fcntl(socket, F_GETFL, 0) | |
| fcntl(socket, F_SETFL, origFlags | O_NONBLOCK) | |
| let origFlags = fcntl(socket, F_GETFL, 0) | |
| guard origFlags != -1 else { | |
| ghostty_surface_pty_tap_close(surface) | |
| return | |
| } | |
| fcntl(socket, F_SETFL, origFlags | O_NONBLOCK) |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
Sources/TerminalController.swift (2)
6327-6329: Resize handling is stubbed out.The
"resize"message type is accepted but not implemented. If this is intentional for a future PR, consider adding a TODO comment or logging to indicate it's not yet functional.case "resize": - // Future: handle terminal resize + // TODO(surface.stream): Implement terminal resize via ghostty_surface_resize + // Expected format: {"type":"resize","cols":80,"rows":24} break🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@Sources/TerminalController.swift` around lines 6327 - 6329, The "resize" branch in TerminalController's message handling (the case "resize") is a stub; replace the bare break with a clear TODO and/or runtime logging so callers know resize is unimplemented: update the case "resize" in TerminalController to add a TODO comment and emit a log (e.g., processLogger.info or OSLog) stating "resize not implemented" (or similar) so it's explicit at runtime and in source that resize handling is pending.
6282-6289: Consider handling partial writes to prevent frame corruption.The
write()calls at lines 6259 and 6287 ignore return values. While unlikely for small JSON frames over Unix sockets, partial writes would corrupt the newline-delimited framing. A retry loop or buffered write helper would be more robust.♻️ Suggested helper pattern
+ /// Writes all bytes to the socket, retrying on partial writes. + private nonisolated func writeAll(_ socket: Int32, _ data: String) { + data.withCString { ptr in + var total = 0 + let len = strlen(ptr) + while total < len { + let n = write(socket, ptr.advanced(by: total), len - total) + if n <= 0 { break } + total += n + } + } + }Then replace:
- frame.withCString { ptr in - _ = write(socket, ptr, strlen(ptr)) - } + writeAll(socket, frame)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@Sources/TerminalController.swift` around lines 6282 - 6289, The write(...) calls that send JSON frames (the one inside the n > 0 branch creating frame and the earlier write at the other output site) ignore the return value and can produce partial writes; replace them with a small buffered-write helper (e.g., writeFully(socket: Int32, buffer: UnsafePointer<CChar>, length: Int) -> Bool) that checks for -1/EINTR, advances the pointer by the number of bytes written, retries until all bytes are written or an irrecoverable error occurs, and returns success/failure; call that helper wherever you currently call write(socket, ptr, strlen(ptr)) so newline-delimited frames are sent atomically.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@Sources/TerminalController.swift`:
- Around line 6191-6212: The captured raw pointer surfacePtr can dangle if the
TerminalPanel is closed while enterStreamRelay is running; fix by holding a
strong reference to the owning object (e.g., capture/preserve TerminalPanel or
its TerminalSurface instance) for the lifetime of the relay and/or add a
cancellation signal checked inside enterStreamRelay. Concretely, stop passing
the raw ghostty_surface_t alone from the block that sets surfacePtr and instead
retain a strong reference to the TerminalPanel/TerminalSurface (the same
instance used to call surface.teardownSurface() in TerminalPanel.close()),
update enterStreamRelay to accept that object or a CancellationToken, and ensure
enterStreamRelay watches for panel removal or the token to exit and release the
surface before calling ghostty_surface_pty_tap_read().
---
Nitpick comments:
In `@Sources/TerminalController.swift`:
- Around line 6327-6329: The "resize" branch in TerminalController's message
handling (the case "resize") is a stub; replace the bare break with a clear TODO
and/or runtime logging so callers know resize is unimplemented: update the case
"resize" in TerminalController to add a TODO comment and emit a log (e.g.,
processLogger.info or OSLog) stating "resize not implemented" (or similar) so
it's explicit at runtime and in source that resize handling is pending.
- Around line 6282-6289: The write(...) calls that send JSON frames (the one
inside the n > 0 branch creating frame and the earlier write at the other output
site) ignore the return value and can produce partial writes; replace them with
a small buffered-write helper (e.g., writeFully(socket: Int32, buffer:
UnsafePointer<CChar>, length: Int) -> Bool) that checks for -1/EINTR, advances
the pointer by the number of bytes written, retries until all bytes are written
or an irrecoverable error occurs, and returns success/failure; call that helper
wherever you currently call write(socket, ptr, strlen(ptr)) so newline-delimited
frames are sent atomically.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c2797daa-14f4-457f-9d37-a2302da64e0e
📒 Files selected for processing (1)
Sources/TerminalController.swift
| return | ||
| } | ||
| surfacePtr = terminalPanel.surface.surface | ||
| } | ||
|
|
||
| if let errorResponse { | ||
| writeSocketResponse(errorResponse, to: socket) | ||
| return | ||
| } | ||
|
|
||
| guard let surface = surfacePtr else { | ||
| let err = v2Error(id: id, code: "not_found", message: "Terminal surface not ready") | ||
| writeSocketResponse(err, to: socket) | ||
| return | ||
| } | ||
|
|
||
| // Send stream-start acknowledgement | ||
| let startMsg = v2Ok(id: id, result: ["stream": true]) | ||
| writeSocketResponse(startMsg, to: socket) | ||
|
|
||
| // Enter the blocking relay loop (runs on the current background thread) | ||
| enterStreamRelay(socket: socket, surface: surface) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for existing patterns handling surface lifetime in streaming/long-running contexts
rg -n -C5 'ghostty_surface.*close|surface.*invalidat|surface.*cancel' --type swiftRepository: manaflow-ai/cmux
Length of output: 1919
🏁 Script executed:
#!/bin/bash
# Get the full v2SurfaceStream function (starting before line 6151)
sed -n '6140,6230p' Sources/TerminalController.swift | cat -nRepository: manaflow-ai/cmux
Length of output: 4670
🏁 Script executed:
#!/bin/bash
# Search for enterStreamRelay implementation
rg -n 'func enterStreamRelay' -A 50 Sources/TerminalController.swift | head -100Repository: manaflow-ai/cmux
Length of output: 2540
🏁 Script executed:
#!/bin/bash
# Check if there's any reference holding or cancellation token for streaming
rg -n 'streaming|stream.*cancel|stream.*invalid' -B 2 -A 2 Sources/TerminalController.swift | head -80Repository: manaflow-ai/cmux
Length of output: 1118
🏁 Script executed:
#!/bin/bash
# Check how TerminalPanel is managed in workspace (is it removed on close?)
rg -n 'terminalPanel|removePanel|removeTerminal' Sources/TerminalController.swift | head -30Repository: manaflow-ai/cmux
Length of output: 2713
🏁 Script executed:
#!/bin/bash
# Check if there's a reference to the streaming socket/relay in TerminalPanel
rg -n 'class TerminalPanel|struct TerminalPanel' -A 30 Sources/GhosttyTerminalView.swift | head -60Repository: manaflow-ai/cmux
Length of output: 42
🏁 Script executed:
#!/bin/bash
# Search for panel lifecycle and what holds it alive
rg -n 'workspace\.panels|panels\[' Sources/TerminalController.swift | head -20Repository: manaflow-ai/cmux
Length of output: 1724
🏁 Script executed:
#!/bin/bash
# Search for panel removal/closing patterns
rg -n 'panels\.removeValue|panels\[.*\] = nil|removePanel|closePanel' Sources/TerminalController.swift | head -20Repository: manaflow-ai/cmux
Length of output: 470
🏁 Script executed:
#!/bin/bash
# Find TerminalPanel class definition
fd -e swift -e swiftui | xargs grep -l 'class TerminalPanel\|struct TerminalPanel' 2>/dev/nullRepository: manaflow-ai/cmux
Length of output: 133
🏁 Script executed:
#!/bin/bash
# Check what reference v2SurfaceStream passes to enterStreamRelay
# Does it keep any strong reference beyond the raw pointer?
sed -n '6150,6250p' Sources/TerminalController.swift | grep -A 5 -B 5 'enterStreamRelay'Repository: manaflow-ai/cmux
Length of output: 1348
🏁 Script executed:
#!/bin/bash
# Read TerminalPanel class definition
wc -l Sources/Panels/TerminalPanel.swiftRepository: manaflow-ai/cmux
Length of output: 98
🏁 Script executed:
#!/bin/bash
# Check if TerminalPanel has any streaming-aware reference management
rg -n 'stream|surface\.surface|ghostty_surface_t' Sources/Panels/TerminalPanel.swift | head -20Repository: manaflow-ai/cmux
Length of output: 175
🏁 Script executed:
#!/an/bin/bash
# Check what happens when a panel is closed - is there streaming cleanup?
rg -n 'func closePanel' -A 30 Sources/TerminalController.swift | head -60Repository: manaflow-ai/cmux
Length of output: 42
🏁 Script executed:
#!/bin/bash
# Read full TerminalPanel.swift
cat -n Sources/Panels/TerminalPanel.swiftRepository: manaflow-ai/cmux
Length of output: 11851
🏁 Script executed:
#!/bin/bash
# Search for closePanel implementation across all files
rg -n 'func closePanel' -A 20 Sources/ | head -80Repository: manaflow-ai/cmux
Length of output: 5789
Surface pointer becomes dangling if terminal panel closes during active streaming.
The raw ghostty_surface_t pointer is captured at line 6193 and passed to enterStreamRelay, which runs an indefinite blocking relay loop on a background thread. If the user closes the terminal panel while streaming is active, the TerminalPanel will be removed from workspace.panels and deallocated. This triggers TerminalPanel.close() → surface.teardownSurface(), freeing the Ghostty surface. The relay loop then calls ghostty_surface_pty_tap_read() on a dangling pointer, causing undefined behavior or a crash.
Hold a strong reference to TerminalPanel (or TerminalSurface) for the duration of the relay, or implement a cancellation mechanism (e.g., subscribe to panel removal and signal the relay to exit cleanly).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@Sources/TerminalController.swift` around lines 6191 - 6212, The captured raw
pointer surfacePtr can dangle if the TerminalPanel is closed while
enterStreamRelay is running; fix by holding a strong reference to the owning
object (e.g., capture/preserve TerminalPanel or its TerminalSurface instance)
for the lifetime of the relay and/or add a cancellation signal checked inside
enterStreamRelay. Concretely, stop passing the raw ghostty_surface_t alone from
the block that sets surfacePtr and instead retain a strong reference to the
TerminalPanel/TerminalSurface (the same instance used to call
surface.teardownSurface() in TerminalPanel.close()), update enterStreamRelay to
accept that object or a CancellationToken, and ensure enterStreamRelay watches
for panel removal or the token to exit and release the surface before calling
ghostty_surface_pty_tap_read().
There was a problem hiding this comment.
2 issues found across 1 file
You’re at about 85% of the monthly review limit. You may want to disable incremental reviews to conserve quota. Reviews will continue until that limit is exceeded. If you need help avoiding interruptions, please contact contact@cubic.dev.
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="Sources/TerminalController.swift">
<violation number="1" location="Sources/TerminalController.swift:6259">
P2: Non-blocking socket writes ignore partial/EAGAIN results, so JSON frames can be truncated and desync the stream protocol.</violation>
<violation number="2" location="Sources/TerminalController.swift:6295">
P2: Non-blocking read() treats transient errors (EAGAIN/EWOULDBLOCK/EINTR) as disconnects, so the stream can terminate unexpectedly. Handle these errno cases and continue instead of breaking.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| if pfd.revents & Int16(POLLIN) != 0 { | ||
| var inBuf = [UInt8](repeating: 0, count: 4096) | ||
| let bytesRead = read(socket, &inBuf, inBuf.count) | ||
| if bytesRead <= 0 { break } |
There was a problem hiding this comment.
P2: Non-blocking read() treats transient errors (EAGAIN/EWOULDBLOCK/EINTR) as disconnects, so the stream can terminate unexpectedly. Handle these errno cases and continue instead of breaking.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At Sources/TerminalController.swift, line 6295:
<comment>Non-blocking read() treats transient errors (EAGAIN/EWOULDBLOCK/EINTR) as disconnects, so the stream can terminate unexpectedly. Handle these errno cases and continue instead of breaking.</comment>
<file context>
@@ -6127,6 +6148,190 @@ class TerminalController {
+ if pfd.revents & Int16(POLLIN) != 0 {
+ var inBuf = [UInt8](repeating: 0, count: 4096)
+ let bytesRead = read(socket, &inBuf, inBuf.count)
+ if bytesRead <= 0 { break }
+
+ if let str = String(bytes: inBuf[0..<bytesRead], encoding: .utf8) {
</file context>
| let b64 = snapshotData.base64EncodedString() | ||
| let frame = "{\"type\":\"snapshot\",\"data\":\"\(b64)\"}\n" | ||
| frame.withCString { ptr in | ||
| _ = write(socket, ptr, strlen(ptr)) |
There was a problem hiding this comment.
P2: Non-blocking socket writes ignore partial/EAGAIN results, so JSON frames can be truncated and desync the stream protocol.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At Sources/TerminalController.swift, line 6259:
<comment>Non-blocking socket writes ignore partial/EAGAIN results, so JSON frames can be truncated and desync the stream protocol.</comment>
<file context>
@@ -6127,6 +6148,190 @@ class TerminalController {
+ let b64 = snapshotData.base64EncodedString()
+ let frame = "{\"type\":\"snapshot\",\"data\":\"\(b64)\"}\n"
+ frame.withCString { ptr in
+ _ = write(socket, ptr, strlen(ptr))
+ }
+ }
</file context>
Summary
What changed?
Added a new
surface.streamsocket command that enables real-time bidirectional PTY streaming over the existing Unix socket. When a client sendssurface.stream, the connection switches from request-response to a persistent streaming relay:{"type":"output","data":"<b64>"}{"type":"input","data":"<b64>"}{"type":"snapshot","data":"<b64>"}on connect so clients see existing terminal stateThe streaming loop runs at ~60fps using
poll()with 16ms timeout.Why?
Addresses long-standing feature requests for remote terminal access:
This gives any external client (SSH tools, web UIs, mobile apps) the ability to observe and interact with existing cmux terminal sessions in real time, with full ANSI color and escape sequence support.
Protocol
The stream uses the same newline-delimited JSON framing as the existing socket API. A dedicated socket connection is required per stream (one surface per connection).
Output frames (cmux → client):
{"type":"snapshot","data":"<base64 initial screen content>"} {"type":"output","data":"<base64 raw PTY bytes with ANSI>"} {"type":"error","message":"..."}Input frames (client → cmux):
{"type":"input","data":"<base64 raw bytes>"} {"type":"resize","cols":80,"rows":24}Depends on
termio: add PTY output tap for real-time terminal streaming(addsghostty_surface_pty_tap_open/read/closeandghostty_surface_pty_writeC API)Testing
How did you test this change?
./scripts/reload.sh --tag pty-stream --launchsurface.read_textandsurface.send_textstill work unchangedWhat did you verify manually?
Checklist