-
Notifications
You must be signed in to change notification settings - Fork 938
Add surface.stream command for real-time PTY streaming #2612
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -1655,6 +1655,21 @@ class TerminalController { | |||||||||||||||||
| continue | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // Detect streaming command before normal processing. | ||||||||||||||||||
| // surface.stream takes over the socket connection for bidirectional | ||||||||||||||||||
| // PTY relay, so it must bypass the normal request/response loop. | ||||||||||||||||||
| if trimmed.contains("\"surface.stream\"") { | ||||||||||||||||||
| if let data = trimmed.data(using: .utf8), | ||||||||||||||||||
| let json = try? JSONSerialization.jsonObject(with: data) as? [String: Any], | ||||||||||||||||||
| let method = json["method"] as? String, | ||||||||||||||||||
| method == "surface.stream" { | ||||||||||||||||||
| let params = json["params"] as? [String: Any] ?? [:] | ||||||||||||||||||
| let id = json["id"] | ||||||||||||||||||
| v2SurfaceStream(id: id, params: params, socket: socket) | ||||||||||||||||||
| return // stream took over this connection | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| let response = processCommand(trimmed) | ||||||||||||||||||
| writeSocketResponse(response, to: socket) | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
@@ -2387,6 +2402,11 @@ class TerminalController { | |||||||||||||||||
| case "surface.read_text": | ||||||||||||||||||
| return v2Result(id: id, self.v2SurfaceReadText(params: params)) | ||||||||||||||||||
|
|
||||||||||||||||||
| case "surface.stream": | ||||||||||||||||||
| // surface.stream is handled in handleClient before processCommand; | ||||||||||||||||||
| // if we reach here the caller used the wrong transport path. | ||||||||||||||||||
| return v2Error(id: id, code: "invalid_request", | ||||||||||||||||||
| message: "surface.stream must be sent as the first command on a dedicated connection") | ||||||||||||||||||
|
|
||||||||||||||||||
| #if DEBUG | ||||||||||||||||||
| // Debug / test-only | ||||||||||||||||||
|
|
@@ -2515,6 +2535,7 @@ class TerminalController { | |||||||||||||||||
| "surface.read_text", | ||||||||||||||||||
| "surface.clear_history", | ||||||||||||||||||
| "surface.trigger_flash", | ||||||||||||||||||
| "surface.stream", | ||||||||||||||||||
| "pane.list", | ||||||||||||||||||
| "pane.focus", | ||||||||||||||||||
| "pane.surfaces", | ||||||||||||||||||
|
|
@@ -6127,6 +6148,190 @@ class TerminalController { | |||||||||||||||||
| return result | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // MARK: - V2 Surface Stream | ||||||||||||||||||
|
|
||||||||||||||||||
| /// Resolves the target surface, sends a stream-start acknowledgement, and enters | ||||||||||||||||||
| /// a bidirectional PTY relay loop that owns the socket until the client disconnects. | ||||||||||||||||||
| /// Called from `handleClient` *before* the normal command dispatch so it can take | ||||||||||||||||||
| /// over the connection. | ||||||||||||||||||
| private func v2SurfaceStream(id: Any?, params: [String: Any], socket: Int32) { | ||||||||||||||||||
| guard let tabManager = v2ResolveTabManager(params: params) else { | ||||||||||||||||||
| let err = v2Error(id: id, code: "unavailable", message: "TabManager not available") | ||||||||||||||||||
| writeSocketResponse(err, to: socket) | ||||||||||||||||||
| return | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| var surfacePtr: ghostty_surface_t? = nil | ||||||||||||||||||
| var errorResponse: String? = nil | ||||||||||||||||||
|
|
||||||||||||||||||
| v2MainSync { | ||||||||||||||||||
| guard let ws = v2ResolveWorkspace(params: params, tabManager: tabManager) else { | ||||||||||||||||||
| errorResponse = v2Error(id: id, code: "not_found", message: "Workspace not found") | ||||||||||||||||||
| return | ||||||||||||||||||
| } | ||||||||||||||||||
| let surfaceId: UUID? | ||||||||||||||||||
| if params["surface_id"] != nil { | ||||||||||||||||||
| surfaceId = v2UUID(params, "surface_id") | ||||||||||||||||||
| guard surfaceId != nil else { | ||||||||||||||||||
| errorResponse = v2Error(id: id, code: "not_found", | ||||||||||||||||||
| message: "Surface not found for the given surface_id") | ||||||||||||||||||
| return | ||||||||||||||||||
| } | ||||||||||||||||||
| } else { | ||||||||||||||||||
| surfaceId = ws.focusedPanelId | ||||||||||||||||||
| } | ||||||||||||||||||
| guard let surfaceId else { | ||||||||||||||||||
| errorResponse = v2Error(id: id, code: "not_found", message: "No focused surface") | ||||||||||||||||||
| return | ||||||||||||||||||
| } | ||||||||||||||||||
| guard let terminalPanel = ws.terminalPanel(for: surfaceId) else { | ||||||||||||||||||
| errorResponse = v2Error(id: id, code: "invalid_params", | ||||||||||||||||||
| message: "Surface is not a terminal", | ||||||||||||||||||
| data: ["surface_id": surfaceId.uuidString]) | ||||||||||||||||||
| return | ||||||||||||||||||
| } | ||||||||||||||||||
| surfacePtr = terminalPanel.surface.surface | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| if let errorResponse { | ||||||||||||||||||
| writeSocketResponse(errorResponse, to: socket) | ||||||||||||||||||
| return | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| guard let surface = surfacePtr else { | ||||||||||||||||||
| let err = v2Error(id: id, code: "not_found", message: "Terminal surface not ready") | ||||||||||||||||||
| writeSocketResponse(err, to: socket) | ||||||||||||||||||
| return | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // Send stream-start acknowledgement | ||||||||||||||||||
| let startMsg = v2Ok(id: id, result: ["stream": true]) | ||||||||||||||||||
| writeSocketResponse(startMsg, to: socket) | ||||||||||||||||||
|
|
||||||||||||||||||
| // Enter the blocking relay loop (runs on the current background thread) | ||||||||||||||||||
| enterStreamRelay(socket: socket, surface: surface) | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /// Read the current visible terminal screen as plain text. | ||||||||||||||||||
| /// Used to send an initial snapshot before live PTY streaming begins. | ||||||||||||||||||
| private nonisolated func readScreenSnapshot(surface: ghostty_surface_t) -> String? { | ||||||||||||||||||
| let topLeft = ghostty_point_s( | ||||||||||||||||||
| tag: GHOSTTY_POINT_VIEWPORT, | ||||||||||||||||||
| coord: GHOSTTY_POINT_COORD_TOP_LEFT, | ||||||||||||||||||
| x: 0, y: 0 | ||||||||||||||||||
| ) | ||||||||||||||||||
| let bottomRight = ghostty_point_s( | ||||||||||||||||||
| tag: GHOSTTY_POINT_VIEWPORT, | ||||||||||||||||||
| coord: GHOSTTY_POINT_COORD_BOTTOM_RIGHT, | ||||||||||||||||||
| x: 0, y: 0 | ||||||||||||||||||
| ) | ||||||||||||||||||
| let selection = ghostty_selection_s( | ||||||||||||||||||
| top_left: topLeft, | ||||||||||||||||||
| bottom_right: bottomRight, | ||||||||||||||||||
| rectangle: false | ||||||||||||||||||
| ) | ||||||||||||||||||
| var text = ghostty_text_s() | ||||||||||||||||||
| guard ghostty_surface_read_text(surface, selection, &text) else { return nil } | ||||||||||||||||||
| defer { ghostty_surface_free_text(surface, &text) } | ||||||||||||||||||
| guard let ptr = text.text, text.text_len > 0 else { return nil } | ||||||||||||||||||
| return String(decoding: Data(bytes: ptr, count: Int(text.text_len)), as: UTF8.self) | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /// Bidirectional relay between a Unix socket and a Ghostty PTY tap. | ||||||||||||||||||
| /// Sends an initial screen snapshot, then streams live PTY output as | ||||||||||||||||||
| /// base64-encoded JSON frames. Reads JSON input from the socket and writes | ||||||||||||||||||
| /// it into the PTY via `ghostty_surface_pty_write`. | ||||||||||||||||||
| /// Blocks the calling thread until the client disconnects or an error occurs. | ||||||||||||||||||
| private nonisolated func enterStreamRelay(socket: Int32, surface: ghostty_surface_t) { | ||||||||||||||||||
| // Open PTY tap with a 64 KB ring buffer | ||||||||||||||||||
| guard ghostty_surface_pty_tap_open(surface, 65536) else { | ||||||||||||||||||
| let err = "{\"type\":\"error\",\"message\":\"Failed to open PTY tap\"}\n" | ||||||||||||||||||
| err.withCString { ptr in _ = write(socket, ptr, strlen(ptr)) } | ||||||||||||||||||
| return | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // Send initial screen snapshot so the phone sees existing content | ||||||||||||||||||
| if let snapshot = readScreenSnapshot(surface: surface) { | ||||||||||||||||||
| let snapshotData = Data(snapshot.utf8) | ||||||||||||||||||
| let b64 = snapshotData.base64EncodedString() | ||||||||||||||||||
| let frame = "{\"type\":\"snapshot\",\"data\":\"\(b64)\"}\n" | ||||||||||||||||||
| frame.withCString { ptr in | ||||||||||||||||||
| _ = write(socket, ptr, strlen(ptr)) | ||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P2: Non-blocking socket writes ignore partial/EAGAIN results, so JSON frames can be truncated and desync the stream protocol. Prompt for AI agents |
||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
Comment on lines
+6254
to
+6261
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Fix: wrap the call in |
||||||||||||||||||
|
|
||||||||||||||||||
| // Set socket non-blocking so we can interleave reads from both directions | ||||||||||||||||||
| let origFlags = fcntl(socket, F_GETFL, 0) | ||||||||||||||||||
| fcntl(socket, F_SETFL, origFlags | O_NONBLOCK) | ||||||||||||||||||
|
Comment on lines
+6264
to
+6265
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
|
|
||||||||||||||||||
| var readBuf = [UInt8](repeating: 0, count: 65536) | ||||||||||||||||||
| var inputBuffer = "" | ||||||||||||||||||
|
|
||||||||||||||||||
| // Bidirectional relay loop | ||||||||||||||||||
| while true { | ||||||||||||||||||
| // Poll socket for input with 16 ms timeout (~60 fps) | ||||||||||||||||||
| var pfd = pollfd(fd: socket, events: Int16(POLLIN), revents: 0) | ||||||||||||||||||
| poll(&pfd, 1, 16) | ||||||||||||||||||
|
|
||||||||||||||||||
| // Check for disconnect | ||||||||||||||||||
| if pfd.revents & Int16(POLLHUP) != 0 { break } | ||||||||||||||||||
| if pfd.revents & Int16(POLLERR) != 0 { break } | ||||||||||||||||||
|
|
||||||||||||||||||
| // --- PTY → socket --- | ||||||||||||||||||
| let n = ghostty_surface_pty_tap_read(surface, &readBuf, UInt32(readBuf.count)) | ||||||||||||||||||
| if n > 0 { | ||||||||||||||||||
| let data = Data(bytes: readBuf, count: Int(n)) | ||||||||||||||||||
| let b64 = data.base64EncodedString() | ||||||||||||||||||
| let frame = "{\"type\":\"output\",\"data\":\"\(b64)\"}\n" | ||||||||||||||||||
| frame.withCString { ptr in | ||||||||||||||||||
| _ = write(socket, ptr, strlen(ptr)) | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // --- socket → PTY --- | ||||||||||||||||||
| if pfd.revents & Int16(POLLIN) != 0 { | ||||||||||||||||||
| var inBuf = [UInt8](repeating: 0, count: 4096) | ||||||||||||||||||
| let bytesRead = read(socket, &inBuf, inBuf.count) | ||||||||||||||||||
| if bytesRead <= 0 { break } | ||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P2: Non-blocking read() treats transient errors (EAGAIN/EWOULDBLOCK/EINTR) as disconnects, so the stream can terminate unexpectedly. Handle these errno cases and continue instead of breaking. Prompt for AI agents |
||||||||||||||||||
|
|
||||||||||||||||||
| if let str = String(bytes: inBuf[0..<bytesRead], encoding: .utf8) { | ||||||||||||||||||
| inputBuffer += str | ||||||||||||||||||
| while let newlineIdx = inputBuffer.firstIndex(of: "\n") { | ||||||||||||||||||
| let line = String(inputBuffer[..<newlineIdx]) | ||||||||||||||||||
| inputBuffer = String(inputBuffer[inputBuffer.index(after: newlineIdx)...]) | ||||||||||||||||||
| handleStreamInput(line, surface: surface) | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
Comment on lines
+6293
to
+6304
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
inputBuffer += str
if inputBuffer.count > 1_048_576 { break } // 1 MB safety cap
while let newlineIdx = inputBuffer.firstIndex(of: "\n") { |
||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // Cleanup | ||||||||||||||||||
| ghostty_surface_pty_tap_close(surface) | ||||||||||||||||||
| fcntl(socket, F_SETFL, origFlags) | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /// Handles a single newline-delimited JSON message from the stream client. | ||||||||||||||||||
| private nonisolated func handleStreamInput(_ jsonLine: String, surface: ghostty_surface_t) { | ||||||||||||||||||
| guard let data = jsonLine.data(using: .utf8), | ||||||||||||||||||
| let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any], | ||||||||||||||||||
| let type = obj["type"] as? String else { return } | ||||||||||||||||||
|
|
||||||||||||||||||
| switch type { | ||||||||||||||||||
| case "input": | ||||||||||||||||||
| guard let b64 = obj["data"] as? String, | ||||||||||||||||||
| let bytes = Data(base64Encoded: b64) else { return } | ||||||||||||||||||
| bytes.withUnsafeBytes { rawBuf in | ||||||||||||||||||
| guard let ptr = rawBuf.baseAddress?.assumingMemoryBound(to: UInt8.self) else { return } | ||||||||||||||||||
| _ = ghostty_surface_pty_write(surface, ptr, UInt32(rawBuf.count)) | ||||||||||||||||||
| } | ||||||||||||||||||
| case "resize": | ||||||||||||||||||
| // Future: handle terminal resize | ||||||||||||||||||
| break | ||||||||||||||||||
| default: | ||||||||||||||||||
| break | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // MARK: - V2 Pane Methods | ||||||||||||||||||
|
|
||||||||||||||||||
| private func v2PaneList(params: [String: Any]) -> V2CallResult { | ||||||||||||||||||
|
|
||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 1919
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 4670
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 2540
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 1118
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 2713
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 42
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 1724
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 470
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 133
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 1348
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 98
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 175
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 42
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 11851
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 5789
Surface pointer becomes dangling if terminal panel closes during active streaming.
The raw
ghostty_surface_tpointer is captured at line 6193 and passed toenterStreamRelay, which runs an indefinite blocking relay loop on a background thread. If the user closes the terminal panel while streaming is active, theTerminalPanelwill be removed fromworkspace.panelsand deallocated. This triggersTerminalPanel.close()→surface.teardownSurface(), freeing the Ghostty surface. The relay loop then callsghostty_surface_pty_tap_read()on a dangling pointer, causing undefined behavior or a crash.Hold a strong reference to
TerminalPanel(orTerminalSurface) for the duration of the relay, or implement a cancellation mechanism (e.g., subscribe to panel removal and signal the relay to exit cleanly).🤖 Prompt for AI Agents