-
-
Notifications
You must be signed in to change notification settings - Fork 959
Add surface.stream command for real-time PTY streaming #2612
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -1655,6 +1655,21 @@ class TerminalController { | |||||||||||||||||
| continue | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // Detect streaming command before normal processing. | ||||||||||||||||||
| // surface.stream takes over the socket connection for bidirectional | ||||||||||||||||||
| // PTY relay, so it must bypass the normal request/response loop. | ||||||||||||||||||
| if trimmed.contains("\"surface.stream\"") { | ||||||||||||||||||
| if let data = trimmed.data(using: .utf8), | ||||||||||||||||||
| let json = try? JSONSerialization.jsonObject(with: data) as? [String: Any], | ||||||||||||||||||
| let method = json["method"] as? String, | ||||||||||||||||||
| method == "surface.stream" { | ||||||||||||||||||
| let params = json["params"] as? [String: Any] ?? [:] | ||||||||||||||||||
| let id = json["id"] | ||||||||||||||||||
| v2SurfaceStream(id: id, params: params, socket: socket) | ||||||||||||||||||
| return // stream took over this connection | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| let response = processCommand(trimmed) | ||||||||||||||||||
| writeSocketResponse(response, to: socket) | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
@@ -2387,6 +2402,11 @@ class TerminalController { | |||||||||||||||||
| case "surface.read_text": | ||||||||||||||||||
| return v2Result(id: id, self.v2SurfaceReadText(params: params)) | ||||||||||||||||||
|
|
||||||||||||||||||
| case "surface.stream": | ||||||||||||||||||
| // surface.stream is handled in handleClient before processCommand; | ||||||||||||||||||
| // if we reach here the caller used the wrong transport path. | ||||||||||||||||||
| return v2Error(id: id, code: "invalid_request", | ||||||||||||||||||
| message: "surface.stream must be sent as the first command on a dedicated connection") | ||||||||||||||||||
|
|
||||||||||||||||||
| #if DEBUG | ||||||||||||||||||
| // Debug / test-only | ||||||||||||||||||
|
|
@@ -2515,6 +2535,7 @@ class TerminalController { | |||||||||||||||||
| "surface.read_text", | ||||||||||||||||||
| "surface.clear_history", | ||||||||||||||||||
| "surface.trigger_flash", | ||||||||||||||||||
| "surface.stream", | ||||||||||||||||||
| "pane.list", | ||||||||||||||||||
| "pane.focus", | ||||||||||||||||||
| "pane.surfaces", | ||||||||||||||||||
|
|
@@ -6127,6 +6148,190 @@ class TerminalController { | |||||||||||||||||
| return result | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // MARK: - V2 Surface Stream | ||||||||||||||||||
|
|
||||||||||||||||||
| /// Resolves the target surface, sends a stream-start acknowledgement, and enters | ||||||||||||||||||
| /// a bidirectional PTY relay loop that owns the socket until the client disconnects. | ||||||||||||||||||
| /// Called from `handleClient` *before* the normal command dispatch so it can take | ||||||||||||||||||
| /// over the connection. | ||||||||||||||||||
| private func v2SurfaceStream(id: Any?, params: [String: Any], socket: Int32) { | ||||||||||||||||||
| guard let tabManager = v2ResolveTabManager(params: params) else { | ||||||||||||||||||
| let err = v2Error(id: id, code: "unavailable", message: "TabManager not available") | ||||||||||||||||||
| writeSocketResponse(err, to: socket) | ||||||||||||||||||
| return | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| var surfacePtr: ghostty_surface_t? = nil | ||||||||||||||||||
| var errorResponse: String? = nil | ||||||||||||||||||
|
|
||||||||||||||||||
| v2MainSync { | ||||||||||||||||||
| guard let ws = v2ResolveWorkspace(params: params, tabManager: tabManager) else { | ||||||||||||||||||
| errorResponse = v2Error(id: id, code: "not_found", message: "Workspace not found") | ||||||||||||||||||
| return | ||||||||||||||||||
| } | ||||||||||||||||||
| let surfaceId: UUID? | ||||||||||||||||||
| if params["surface_id"] != nil { | ||||||||||||||||||
| surfaceId = v2UUID(params, "surface_id") | ||||||||||||||||||
| guard surfaceId != nil else { | ||||||||||||||||||
| errorResponse = v2Error(id: id, code: "not_found", | ||||||||||||||||||
| message: "Surface not found for the given surface_id") | ||||||||||||||||||
| return | ||||||||||||||||||
| } | ||||||||||||||||||
| } else { | ||||||||||||||||||
| surfaceId = ws.focusedPanelId | ||||||||||||||||||
| } | ||||||||||||||||||
| guard let surfaceId else { | ||||||||||||||||||
| errorResponse = v2Error(id: id, code: "not_found", message: "No focused surface") | ||||||||||||||||||
| return | ||||||||||||||||||
| } | ||||||||||||||||||
| guard let terminalPanel = ws.terminalPanel(for: surfaceId) else { | ||||||||||||||||||
| errorResponse = v2Error(id: id, code: "invalid_params", | ||||||||||||||||||
| message: "Surface is not a terminal", | ||||||||||||||||||
| data: ["surface_id": surfaceId.uuidString]) | ||||||||||||||||||
| return | ||||||||||||||||||
| } | ||||||||||||||||||
| surfacePtr = terminalPanel.surface.surface | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| if let errorResponse { | ||||||||||||||||||
| writeSocketResponse(errorResponse, to: socket) | ||||||||||||||||||
| return | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| guard let surface = surfacePtr else { | ||||||||||||||||||
| let err = v2Error(id: id, code: "not_found", message: "Terminal surface not ready") | ||||||||||||||||||
| writeSocketResponse(err, to: socket) | ||||||||||||||||||
| return | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // Send stream-start acknowledgement | ||||||||||||||||||
| let startMsg = v2Ok(id: id, result: ["stream": true]) | ||||||||||||||||||
| writeSocketResponse(startMsg, to: socket) | ||||||||||||||||||
|
|
||||||||||||||||||
| // Enter the blocking relay loop (runs on the current background thread) | ||||||||||||||||||
| enterStreamRelay(socket: socket, surface: surface) | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /// Read the current visible terminal screen as plain text. | ||||||||||||||||||
| /// Used to send an initial snapshot before live PTY streaming begins. | ||||||||||||||||||
| private nonisolated func readScreenSnapshot(surface: ghostty_surface_t) -> String? { | ||||||||||||||||||
| let topLeft = ghostty_point_s( | ||||||||||||||||||
| tag: GHOSTTY_POINT_VIEWPORT, | ||||||||||||||||||
| coord: GHOSTTY_POINT_COORD_TOP_LEFT, | ||||||||||||||||||
| x: 0, y: 0 | ||||||||||||||||||
| ) | ||||||||||||||||||
| let bottomRight = ghostty_point_s( | ||||||||||||||||||
| tag: GHOSTTY_POINT_VIEWPORT, | ||||||||||||||||||
| coord: GHOSTTY_POINT_COORD_BOTTOM_RIGHT, | ||||||||||||||||||
| x: 0, y: 0 | ||||||||||||||||||
| ) | ||||||||||||||||||
| let selection = ghostty_selection_s( | ||||||||||||||||||
| top_left: topLeft, | ||||||||||||||||||
| bottom_right: bottomRight, | ||||||||||||||||||
| rectangle: false | ||||||||||||||||||
| ) | ||||||||||||||||||
| var text = ghostty_text_s() | ||||||||||||||||||
| guard ghostty_surface_read_text(surface, selection, &text) else { return nil } | ||||||||||||||||||
| defer { ghostty_surface_free_text(surface, &text) } | ||||||||||||||||||
| guard let ptr = text.text, text.text_len > 0 else { return nil } | ||||||||||||||||||
| return String(decoding: Data(bytes: ptr, count: Int(text.text_len)), as: UTF8.self) | ||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider using failable
Optional fix- return String(decoding: Data(bytes: ptr, count: Int(text.text_len)), as: UTF8.self)
+ return String(bytes: Data(bytes: ptr, count: Int(text.text_len)), encoding: .utf8)That said, the replacement behavior may be acceptable here since terminal output can contain arbitrary bytes and graceful degradation is reasonable. 🧰 Tools🪛 SwiftLint (0.63.2)[Warning] 6243-6243: Prefer failable (optional_data_string_conversion) 🤖 Prompt for AI Agents |
||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /// Bidirectional relay between a Unix socket and a Ghostty PTY tap. | ||||||||||||||||||
| /// Sends an initial screen snapshot, then streams live PTY output as | ||||||||||||||||||
| /// base64-encoded JSON frames. Reads JSON input from the socket and writes | ||||||||||||||||||
| /// it into the PTY via `ghostty_surface_pty_write`. | ||||||||||||||||||
| /// Blocks the calling thread until the client disconnects or an error occurs. | ||||||||||||||||||
| private nonisolated func enterStreamRelay(socket: Int32, surface: ghostty_surface_t) { | ||||||||||||||||||
| // Open PTY tap with a 64 KB ring buffer | ||||||||||||||||||
| guard ghostty_surface_pty_tap_open(surface, 65536) else { | ||||||||||||||||||
| let err = "{\"type\":\"error\",\"message\":\"Failed to open PTY tap\"}\n" | ||||||||||||||||||
| err.withCString { ptr in _ = write(socket, ptr, strlen(ptr)) } | ||||||||||||||||||
| return | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // Send initial screen snapshot so the phone sees existing content | ||||||||||||||||||
| if let snapshot = readScreenSnapshot(surface: surface) { | ||||||||||||||||||
| let snapshotData = Data(snapshot.utf8) | ||||||||||||||||||
| let b64 = snapshotData.base64EncodedString() | ||||||||||||||||||
| let frame = "{\"type\":\"snapshot\",\"data\":\"\(b64)\"}\n" | ||||||||||||||||||
| frame.withCString { ptr in | ||||||||||||||||||
| _ = write(socket, ptr, strlen(ptr)) | ||||||||||||||||||
cubic-dev-ai[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
||||||||||||||||||
|
|
||||||||||||||||||
| // Set socket non-blocking so we can interleave reads from both directions | ||||||||||||||||||
| let origFlags = fcntl(socket, F_GETFL, 0) | ||||||||||||||||||
| fcntl(socket, F_SETFL, origFlags | O_NONBLOCK) | ||||||||||||||||||
|
||||||||||||||||||
| let origFlags = fcntl(socket, F_GETFL, 0) | |
| fcntl(socket, F_SETFL, origFlags | O_NONBLOCK) | |
| let origFlags = fcntl(socket, F_GETFL, 0) | |
| guard origFlags != -1 else { | |
| ghostty_surface_pty_tap_close(surface) | |
| return | |
| } | |
| fcntl(socket, F_SETFL, origFlags | O_NONBLOCK) |
cubic-dev-ai[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inputBuffer accumulates raw bytes from the socket and is only drained when a \n delimiter is found. A client that sends a large payload (or a corrupt/malicious stream) without a newline will grow this buffer without bound until the connection closes or the process OOMs. Add a hard cap and break the loop when it is exceeded:
inputBuffer += str
if inputBuffer.count > 1_048_576 { break } // 1 MB safety cap
while let newlineIdx = inputBuffer.firstIndex(of: "\n") {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 1919
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 4670
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 2540
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 1118
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 2713
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 42
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 1724
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 470
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 133
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 1348
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 98
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 175
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 42
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 11851
🏁 Script executed:
Repository: manaflow-ai/cmux
Length of output: 5789
Surface pointer becomes dangling if terminal panel closes during active streaming.
The raw
ghostty_surface_tpointer is captured at line 6193 and passed toenterStreamRelay, which runs an indefinite blocking relay loop on a background thread. If the user closes the terminal panel while streaming is active, theTerminalPanelwill be removed fromworkspace.panelsand deallocated. This triggersTerminalPanel.close()→surface.teardownSurface(), freeing the Ghostty surface. The relay loop then callsghostty_surface_pty_tap_read()on a dangling pointer, causing undefined behavior or a crash.Hold a strong reference to
TerminalPanel(orTerminalSurface) for the duration of the relay, or implement a cancellation mechanism (e.g., subscribe to panel removal and signal the relay to exit cleanly).🤖 Prompt for AI Agents