Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion images/chromium-headful/run-unikernel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ trap 'rm -rf "$FLAGS_DIR"' EXIT


deploy_args=(
--vcpus 4
-M 4096
-p 9222:9222/tls
-p 444:10001/tls
-e DISPLAY_NUM=1
-e HEIGHT=768
-e WIDTH=1024
-e RUN_AS_ROOT="$RUN_AS_ROOT" \
-e RUN_AS_ROOT="$RUN_AS_ROOT"
-e LOG_CDP_MESSAGES=true
-v "$volume_name":/chromium
-n "$NAME"
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[program:kernel-images-api]
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" exec /usr/local/bin/kernel-images-api'
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" exec /usr/local/bin/kernel-images-api'
autostart=false
autorestart=true
startsecs=2
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[program:kernel-images-api]
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" exec /usr/local/bin/kernel-images-api'
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" exec /usr/local/bin/kernel-images-api'
autostart=false
autorestart=true
startsecs=2
Expand Down
3 changes: 2 additions & 1 deletion images/chromium-headless/run-unikernel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ deploy_args=(
--scale-to-zero idle
--scale-to-zero-cooldown 3000ms
--scale-to-zero-stateful
--vcpus 1
-M 1024
-e RUN_AS_ROOT="$RUN_AS_ROOT"
-e LOG_CDP_MESSAGES=true \
-p 9222:9222/tls
-p 444:10001/tls
--vcpus 2
-n "$NAME"
)

Expand Down
2 changes: 1 addition & 1 deletion server/cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func main() {
})
})
rDevtools.Get("/*", func(w http.ResponseWriter, r *http.Request) {
devtoolsproxy.WebSocketProxyHandler(upstreamMgr, slogger).ServeHTTP(w, r)
devtoolsproxy.WebSocketProxyHandler(upstreamMgr, slogger, config.LogCDPMessages).ServeHTTP(w, r)
})

srvDevtools := &http.Server{
Expand Down
3 changes: 3 additions & 0 deletions server/cmd/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type Config struct {

// Absolute or relative path to the ffmpeg binary. If empty the code falls back to "ffmpeg" on $PATH.
PathToFFmpeg string `envconfig:"FFMPEG_PATH" default:"ffmpeg"`

// DevTools proxy configuration
LogCDPMessages bool `envconfig:"LOG_CDP_MESSAGES" default:"false"`
}

// Load loads configuration from environment variables
Expand Down
205 changes: 121 additions & 84 deletions server/lib/devtoolsproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/url"
"os/exec"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -145,8 +146,8 @@ func (u *UpstreamManager) runTailOnce(ctx context.Context) {

// WebSocketProxyHandler returns an http.Handler that upgrades incoming connections and
// proxies them to the current upstream websocket URL. It expects only websocket requests.
func WebSocketProxyHandler(mgr *UpstreamManager, logger *slog.Logger) http.Handler {
upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}
// If logCDPMessages is true, all CDP messages will be logged with their direction.
func WebSocketProxyHandler(mgr *UpstreamManager, logger *slog.Logger, logCDPMessages bool) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
upstreamCurrent := mgr.Current()
if upstreamCurrent == "" {
Expand All @@ -160,17 +161,41 @@ func WebSocketProxyHandler(mgr *UpstreamManager, logger *slog.Logger) http.Handl
}
// Always use the full upstream path and query, ignoring the client's request path/query
upstreamURL := (&url.URL{Scheme: parsed.Scheme, Host: parsed.Host, Path: parsed.Path, RawQuery: parsed.RawQuery}).String()
upgrader := websocket.Upgrader{
ReadBufferSize: 65536,
WriteBufferSize: 65536,
EnableCompression: true,
CheckOrigin: func(r *http.Request) bool { return true },
}
logger.Info("upgrader config", slog.Any("upgrader", upgrader))
clientConn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logger.Error("websocket upgrade failed", slog.String("err", err.Error()))
return
}
upstreamConn, _, err := websocket.DefaultDialer.Dial(upstreamURL, nil)
clientConn.SetReadDeadline(time.Time{}) // No timeout--hold on to connections for dear life
clientConn.SetWriteDeadline(time.Time{}) // No timeout--hold on to connections for dear life
clientConn.SetReadLimit(100 * 1024 * 1024) // 100 MB. Effectively no maximum size of message from client
clientConn.EnableWriteCompression(true)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium Security

Setting ReadLimit to 100MB could potentially expose the system to memory exhaustion attacks if a malicious client sends very large messages. Consider if this limit is truly necessary or if a more conservative limit (e.g., 10MB) would still handle legitimate CDP use cases.
Agent: 🤖 General

clientConn.SetCompressionLevel(6)

dialer := websocket.Dialer{
ReadBufferSize: 65536,
WriteBufferSize: 65536,
HandshakeTimeout: 30 * time.Second,
}
logger.Info("dialer config", slog.Any("dialer", dialer))
upstreamConn, _, err := dialer.Dial(upstreamURL, nil)
if err != nil {
logger.Error("dial upstream failed", slog.String("err", err.Error()), slog.String("url", upstreamURL))
_ = clientConn.Close()
return
}
upstreamConn.SetReadLimit(100 * 1024 * 1024) // 100 MB. Effectively no maximum size of message from upstream
upstreamConn.EnableWriteCompression(true)
upstreamConn.SetCompressionLevel(6)
upstreamConn.SetReadDeadline(time.Time{}) // no timeout
upstreamConn.SetWriteDeadline(time.Time{}) // no timeout
logger.Debug("proxying devtools websocket", slog.String("url", upstreamURL))

var once sync.Once
Expand All @@ -180,7 +205,7 @@ func WebSocketProxyHandler(mgr *UpstreamManager, logger *slog.Logger) http.Handl
_ = clientConn.Close()
})
}
proxyWebSocket(r.Context(), clientConn, upstreamConn, cleanup, logger)
proxyWebSocket(r.Context(), clientConn, upstreamConn, cleanup, logger, logCDPMessages)
})
}

Expand All @@ -190,47 +215,103 @@ type wsConn interface {
Close() error
}

func proxyWebSocket(ctx context.Context, clientConn, upstreamConn wsConn, onClose func(), logger *slog.Logger) {
errChan := make(chan error, 2)
// logCDPMessage logs a CDP message with its direction if logging is enabled
func logCDPMessage(logger *slog.Logger, direction string, mt int, msg []byte) {
if mt != websocket.TextMessage {
return // Only log text messages (CDP messages)
}

// Extract fields using regex from raw message
rawMsg := string(msg)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium Logic

The regex-based JSON parsing in logCDPMessage is fragile and could fail with escaped quotes, nested objects, or malformed JSON. Consider using json.Unmarshal with a simple struct that captures the fields you need, which would be more robust and performant.
Agent: 🤖 General

// Regex patterns to match "key":"val" or "key": "val" for string values
extractStringField := func(key string) string {
pattern := fmt.Sprintf(`"%s"\s*:\s*"([^"]*)"`, key)
re := regexp.MustCompile(pattern)
matches := re.FindStringSubmatch(rawMsg)
if len(matches) > 1 {
return matches[1]
}
return ""
}

// Regex pattern to match "key": number for numeric id
extractNumberField := func(key string) interface{} {
pattern := fmt.Sprintf(`"%s"\s*:\s*(\d+)`, key)
re := regexp.MustCompile(pattern)
matches := re.FindStringSubmatch(rawMsg)
if len(matches) > 1 {
// Try to parse as int first
if val, err := strconv.Atoi(matches[1]); err == nil {
return val
}
// Fall back to float64
if val, err := strconv.ParseFloat(matches[1], 64); err == nil {
return val
}
}
return nil
}

// Extract fields using regex
method := extractStringField("method")
id := extractNumberField("id")
sessionId := extractStringField("sessionId")
targetId := extractStringField("targetId")
frameId := extractStringField("frameId")

// Build log attributes, only including non-empty values
attrs := []slog.Attr{
slog.String("dir", direction),
}

if sessionId != "" {
attrs = append(attrs, slog.String("sessionId", sessionId))
}
if targetId != "" {
attrs = append(attrs, slog.String("targetId", targetId))
}
if id != nil {
attrs = append(attrs, slog.Any("id", id))
}
if frameId != "" {
attrs = append(attrs, slog.String("frameId", frameId))
}

if method != "" {
attrs = append(attrs, slog.String("method", method))
}

attrs = append(attrs, slog.Int("raw_length", len(msg)))

// Single-writer guarantee for client connection
var clientWriteMu sync.Mutex
// Convert attrs to individual slog.Attr arguments
args := make([]any, len(attrs))
for i, attr := range attrs {
args[i] = attr
}

// Heartbeat tracking
var hbMu sync.Mutex
lastClientActivity := time.Now()
var lastPingSent time.Time
var lastPongReceived time.Time
var outstandingPing bool
logger.Info("cdp", args...)
}

func proxyWebSocket(ctx context.Context, clientConn, upstreamConn wsConn, onClose func(), logger *slog.Logger, logCDPMessages bool) {
errChan := make(chan error, 2)

go func() {
for {
mt, msg, err := clientConn.ReadMessage()
if err != nil {
logger.Error("client read error", slog.String("err", err.Error()))
errChan <- err
break
}
// Record any client activity
hbMu.Lock()
lastClientActivity = time.Now()
hbMu.Unlock()

// Handle control frames from client
if mt == websocket.PongMessage {
hbMu.Lock()
lastPongReceived = time.Now()
outstandingPing = false
hbMu.Unlock()
continue
}
if mt == websocket.PingMessage {
clientWriteMu.Lock()
_ = clientConn.WriteMessage(websocket.PongMessage, nil)
clientWriteMu.Unlock()
continue

// Log CDP messages if enabled
if logCDPMessages {
logCDPMessage(logger, "->", mt, msg)
}

if err := upstreamConn.WriteMessage(mt, msg); err != nil {
logger.Error("upstream write error", slog.String("err", err.Error()))
errChan <- err
break
}
Expand All @@ -240,65 +321,21 @@ func proxyWebSocket(ctx context.Context, clientConn, upstreamConn wsConn, onClos
for {
mt, msg, err := upstreamConn.ReadMessage()
if err != nil {
logger.Error("upstream read error", slog.String("err", err.Error()))
errChan <- err
break
}
clientWriteMu.Lock()

// Log CDP messages if enabled
if logCDPMessages {
logCDPMessage(logger, "<-", mt, msg)
}

if err := clientConn.WriteMessage(mt, msg); err != nil {
clientWriteMu.Unlock()
logger.Error("client write error", slog.String("err", err.Error()))
errChan <- err
break
}
clientWriteMu.Unlock()
}
}()

// Heartbeat goroutine
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
now := time.Now()
hbMu.Lock()
inactivity := now.Sub(lastClientActivity)
pingOutstanding := outstandingPing
lastPing := lastPingSent
lastPong := lastPongReceived
hbMu.Unlock()

if pingOutstanding {
if now.Sub(lastPing) > 10*time.Second && lastPong.Before(lastPing) {
logger.Warn("client ping timeout; closing devtools websocket")
select {
case errChan <- fmt.Errorf("ping timeout"):
default:
}
return
}
continue
}

if inactivity >= 30*time.Second {
clientWriteMu.Lock()
pingErr := clientConn.WriteMessage(websocket.PingMessage, nil)
clientWriteMu.Unlock()
if pingErr != nil {
select {
case errChan <- pingErr:
default:
}
return
}
hbMu.Lock()
lastPingSent = now
outstandingPing = true
hbMu.Unlock()
}
}
}
}()

Expand Down
2 changes: 1 addition & 1 deletion server/lib/devtoolsproxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestWebSocketProxyHandler_ProxiesEcho(t *testing.T) {
// seed current upstream to echo server including path/query (bypass tailing)
mgr.setCurrent((&url.URL{Scheme: u.Scheme, Host: u.Host, Path: u.Path, RawQuery: u.RawQuery}).String())

proxy := WebSocketProxyHandler(mgr, logger)
proxy := WebSocketProxyHandler(mgr, logger, false)
proxySrv := httptest.NewServer(proxy)
defer proxySrv.Close()

Expand Down
Loading
Loading