Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 69 additions & 27 deletions cmd/api/api/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,18 @@ type ExecRequest struct {
TTY bool `json:"tty"`
Env map[string]string `json:"env,omitempty"`
Cwd string `json:"cwd,omitempty"`
Timeout int32 `json:"timeout,omitempty"` // seconds
Timeout int32 `json:"timeout,omitempty"` // seconds
WaitForAgent int32 `json:"wait_for_agent,omitempty"` // seconds to wait for guest agent to be ready
Rows uint32 `json:"rows,omitempty"` // Initial terminal rows (0 = default)
Cols uint32 `json:"cols,omitempty"` // Initial terminal cols (0 = default)
}

// ResizeMessage represents a window resize control message
type ResizeMessage struct {
Resize struct {
Rows uint32 `json:"rows"`
Cols uint32 `json:"cols"`
} `json:"resize"`
}

// ExecHandler handles exec requests via WebSocket for bidirectional streaming
Expand Down Expand Up @@ -108,10 +118,19 @@ func (s *ApiService) ExecHandler(w http.ResponseWriter, r *http.Request) {
"cwd", execReq.Cwd,
"timeout", execReq.Timeout,
"wait_for_agent", execReq.WaitForAgent,
"rows", execReq.Rows,
"cols", execReq.Cols,
)

// Create WebSocket read/writer wrapper
wsConn := &wsReadWriter{ws: ws, ctx: ctx}
// Create resize channel for TTY sessions
var resizeChan chan *guest.WindowSize
if execReq.TTY {
resizeChan = make(chan *guest.WindowSize, 10)
defer close(resizeChan)
}

// Create WebSocket read/writer wrapper that handles resize messages
wsConn := &wsReadWriter{ws: ws, ctx: ctx, resizeChan: resizeChan}

// Create vsock dialer for this hypervisor type
dialer, err := hypervisor.NewVsockDialer(hypervisor.Type(inst.HypervisorType), inst.VsockSocket, inst.VsockCID)
Expand All @@ -133,6 +152,9 @@ func (s *ApiService) ExecHandler(w http.ResponseWriter, r *http.Request) {
Cwd: execReq.Cwd,
Timeout: execReq.Timeout,
WaitForAgent: time.Duration(execReq.WaitForAgent) * time.Second,
Rows: execReq.Rows,
Cols: execReq.Cols,
ResizeChan: resizeChan,
})

duration := time.Since(startTime)
Expand Down Expand Up @@ -167,41 +189,61 @@ func (s *ApiService) ExecHandler(w http.ResponseWriter, r *http.Request) {
}

// wsReadWriter wraps a WebSocket connection to implement io.ReadWriter
// It also handles resize control messages for TTY sessions
type wsReadWriter struct {
ws *websocket.Conn
ctx context.Context
reader io.Reader
mu sync.Mutex
ws *websocket.Conn
ctx context.Context
reader io.Reader
mu sync.Mutex
resizeChan chan<- *guest.WindowSize // Channel to send resize events (nil if not TTY)
}

func (w *wsReadWriter) Read(p []byte) (n int, err error) {
w.mu.Lock()
defer w.mu.Unlock()

// If we have a pending reader, continue reading from it
if w.reader != nil {
n, err = w.reader.Read(p)
if err != io.EOF {
return n, err
for {
// If we have a pending reader, continue reading from it
if w.reader != nil {
n, err = w.reader.Read(p)
if err != io.EOF {
return n, err
}
// EOF means we finished this message, get next one
w.reader = nil
}
// EOF means we finished this message, get next one
w.reader = nil
}

// Read next WebSocket message
messageType, data, err := w.ws.ReadMessage()
if err != nil {
return 0, err
}
// Read next WebSocket message
messageType, data, err := w.ws.ReadMessage()
if err != nil {
return 0, err
}

// Only handle binary and text messages
if messageType != websocket.BinaryMessage && messageType != websocket.TextMessage {
return 0, fmt.Errorf("unexpected message type: %d", messageType)
}
// Handle text messages as potential control messages
if messageType == websocket.TextMessage && w.resizeChan != nil {
// Try to parse as resize message
var resizeMsg ResizeMessage
if err := json.Unmarshal(data, &resizeMsg); err == nil && resizeMsg.Resize.Rows > 0 && resizeMsg.Resize.Cols > 0 {
// Send resize event (non-blocking)
select {
case w.resizeChan <- &guest.WindowSize{Rows: resizeMsg.Resize.Rows, Cols: resizeMsg.Resize.Cols}:
default:
// Channel full, skip
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Panic risk when sending to closed resize channel

Medium Severity

The resizeChan is closed via defer (line 129) before the WebSocket is closed (line 75), due to LIFO defer ordering. The wsReadWriter.Read method's select statement at lines 228-232 sends to w.resizeChan, but sending to a closed channel in Go causes a panic—even within a select with a default case. If a resize message arrives after ExecIntoInstance returns but before ExecHandler completes, the orphaned stdin goroutine could panic when processing that message.

Additional Locations (1)

Fix in Cursor Fix in Web

continue // Get next message
}
// Not a resize message, treat as stdin
}

// Create reader for this message
w.reader = bytes.NewReader(data)
return w.reader.Read(p)
// Binary messages and non-resize text messages are stdin data
if messageType != websocket.BinaryMessage && messageType != websocket.TextMessage {
return 0, fmt.Errorf("unexpected message type: %d", messageType)
}

// Create reader for this message
w.reader = bytes.NewReader(data)
return w.reader.Read(p)
}
}

func (w *wsReadWriter) Write(p []byte) (n int, err error) {
Expand Down
59 changes: 59 additions & 0 deletions cmd/api/api/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package api

import (
"bytes"
"io"
"os"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -273,6 +275,45 @@ func TestExecWithDebianMinimal(t *testing.T) {
assert.Contains(t, stdout.String(), "bookworm", "Should be Debian 12 (bookworm)")
t.Logf("OS: %s", strings.Split(stdout.String(), "\n")[0])

// Test TTY with TERM environment variable and window resize
t.Run("TTY with TERM and resize", func(t *testing.T) {
stdinR, stdinW := io.Pipe()
resizeChan := make(chan *guest.WindowSize, 1)

var stdoutSync syncBuffer // Thread-safe buffer

done := make(chan struct{})
go func() {
defer close(done)
guest.ExecIntoInstance(ctx(), dialer2, guest.ExecOptions{
Command: []string{"/bin/sh", "-c", "echo $TERM; stty size; read x; stty size"},
TTY: true,
Rows: 24,
Cols: 80,
Stdin: stdinR,
Stdout: &stdoutSync,
ResizeChan: resizeChan,
})
}()

// Poll until initial output (TERM and size)
require.Eventually(t, func() bool {
out := stdoutSync.String()
return strings.Contains(out, "xterm-256color") && strings.Contains(out, "24 80")
}, 5*time.Second, 50*time.Millisecond, "TERM and initial size not printed")

// Send resize THEN stdin - gRPC stream guarantees ordering
resizeChan <- &guest.WindowSize{Rows: 50, Cols: 150}
stdinW.Write([]byte("\n"))
stdinW.Close()

// Poll until new size appears
require.Eventually(t, func() bool {
return strings.Contains(stdoutSync.String(), "50 150")
}, 5*time.Second, 50*time.Millisecond, "resized size not printed")

<-done
})
}

// collectTestLogs collects logs from an instance (non-streaming)
Expand Down Expand Up @@ -302,3 +343,21 @@ func (b *outputBuffer) Write(p []byte) (n int, err error) {
func (b *outputBuffer) String() string {
return b.buf.String()
}

// syncBuffer is a thread-safe buffer for concurrent write/read in tests
type syncBuffer struct {
mu sync.Mutex
buf bytes.Buffer
}

func (b *syncBuffer) Write(p []byte) (n int, err error) {
b.mu.Lock()
defer b.mu.Unlock()
return b.buf.Write(p)
}

func (b *syncBuffer) String() string {
b.mu.Lock()
defer b.mu.Unlock()
return b.buf.String()
}
31 changes: 26 additions & 5 deletions lib/guest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,23 @@ type ExitStatus struct {
Code int
}

// Note: WindowSize is defined in guest.pb.go (proto-generated)
// Use guest.WindowSize{Rows: N, Cols: M} for resize events

// ExecOptions configures command execution
type ExecOptions struct {
Command []string
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
TTY bool
Env map[string]string // Environment variables
Cwd string // Working directory (optional)
Timeout int32 // Execution timeout in seconds (0 = no timeout)
WaitForAgent time.Duration // Max time to wait for agent to be ready (0 = no wait, fail immediately)
Env map[string]string // Environment variables
Cwd string // Working directory (optional)
Timeout int32 // Execution timeout in seconds (0 = no timeout)
WaitForAgent time.Duration // Max time to wait for agent to be ready (0 = no wait, fail immediately)
Rows uint32 // Initial terminal rows (0 = default 24)
Cols uint32 // Initial terminal cols (0 = default 80)
ResizeChan <-chan *WindowSize // Optional: channel to receive resize events (pointer to avoid copying mutex)
}

// ExecIntoInstance executes command in instance via vsock using gRPC.
Expand Down Expand Up @@ -203,7 +209,7 @@ func execIntoInstanceOnce(ctx context.Context, dialer hypervisor.VsockDialer, op
// Ensure stream is properly closed when we're done
defer stream.CloseSend()

// Send start request
// Send start request with initial window size
if err := stream.Send(&ExecRequest{
Request: &ExecRequest_Start{
Start: &ExecStart{
Expand All @@ -212,6 +218,8 @@ func execIntoInstanceOnce(ctx context.Context, dialer hypervisor.VsockDialer, op
Env: opts.Env,
Cwd: opts.Cwd,
TimeoutSeconds: opts.Timeout,
Rows: opts.Rows,
Cols: opts.Cols,
},
},
}); err != nil {
Expand All @@ -238,6 +246,19 @@ func execIntoInstanceOnce(ctx context.Context, dialer hypervisor.VsockDialer, op
}()
}

// Handle resize events in background (if channel provided)
if opts.ResizeChan != nil {
go func() {
for resize := range opts.ResizeChan {
stream.Send(&ExecRequest{
Request: &ExecRequest_Resize{
Resize: resize,
},
})
}
}()
}

// Receive responses
var totalStdout, totalStderr int
for {
Expand Down
Loading
Loading