diff --git a/images/chromium-headful/run-unikernel.sh b/images/chromium-headful/run-unikernel.sh index 37655904..9dc9cfe5 100755 --- a/images/chromium-headful/run-unikernel.sh +++ b/images/chromium-headful/run-unikernel.sh @@ -41,13 +41,15 @@ trap 'rm -rf "$FLAGS_DIR"' EXIT deploy_args=( + --vcpus 4 -M 4096 -p 9222:9222/tls -p 444:10001/tls -e DISPLAY_NUM=1 -e HEIGHT=768 -e WIDTH=1024 - -e RUN_AS_ROOT="$RUN_AS_ROOT" \ + -e RUN_AS_ROOT="$RUN_AS_ROOT" + -e LOG_CDP_MESSAGES=true -v "$volume_name":/chromium -n "$NAME" ) diff --git a/images/chromium-headful/supervisor/services/kernel-images-api.conf b/images/chromium-headful/supervisor/services/kernel-images-api.conf index a04bfb35..e57d30a8 100644 --- a/images/chromium-headful/supervisor/services/kernel-images-api.conf +++ b/images/chromium-headful/supervisor/services/kernel-images-api.conf @@ -1,5 +1,5 @@ [program:kernel-images-api] -command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" exec /usr/local/bin/kernel-images-api' +command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" exec /usr/local/bin/kernel-images-api' autostart=false autorestart=true startsecs=2 diff --git a/images/chromium-headless/image/supervisor/services/kernel-images-api.conf b/images/chromium-headless/image/supervisor/services/kernel-images-api.conf index a04bfb35..e57d30a8 100644 --- a/images/chromium-headless/image/supervisor/services/kernel-images-api.conf +++ b/images/chromium-headless/image/supervisor/services/kernel-images-api.conf @@ -1,5 +1,5 @@ [program:kernel-images-api] -command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" exec /usr/local/bin/kernel-images-api' +command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" exec /usr/local/bin/kernel-images-api' autostart=false autorestart=true startsecs=2 diff --git a/images/chromium-headless/run-unikernel.sh b/images/chromium-headless/run-unikernel.sh index 2dda6068..eb1b98c7 100755 --- a/images/chromium-headless/run-unikernel.sh +++ b/images/chromium-headless/run-unikernel.sh @@ -15,11 +15,12 @@ deploy_args=( --scale-to-zero idle --scale-to-zero-cooldown 3000ms --scale-to-zero-stateful + --vcpus 1 -M 1024 -e RUN_AS_ROOT="$RUN_AS_ROOT" + -e LOG_CDP_MESSAGES=true \ -p 9222:9222/tls -p 444:10001/tls - --vcpus 2 -n "$NAME" ) diff --git a/server/cmd/api/main.go b/server/cmd/api/main.go index 57806754..83b26f0f 100644 --- a/server/cmd/api/main.go +++ b/server/cmd/api/main.go @@ -141,7 +141,7 @@ func main() { }) }) rDevtools.Get("/*", func(w http.ResponseWriter, r *http.Request) { - devtoolsproxy.WebSocketProxyHandler(upstreamMgr, slogger).ServeHTTP(w, r) + devtoolsproxy.WebSocketProxyHandler(upstreamMgr, slogger, config.LogCDPMessages).ServeHTTP(w, r) }) srvDevtools := &http.Server{ diff --git a/server/cmd/config/config.go b/server/cmd/config/config.go index 52e8ffc6..7b063d3b 100644 --- a/server/cmd/config/config.go +++ b/server/cmd/config/config.go @@ -19,6 +19,9 @@ type Config struct { // Absolute or relative path to the ffmpeg binary. If empty the code falls back to "ffmpeg" on $PATH. PathToFFmpeg string `envconfig:"FFMPEG_PATH" default:"ffmpeg"` + + // DevTools proxy configuration + LogCDPMessages bool `envconfig:"LOG_CDP_MESSAGES" default:"false"` } // Load loads configuration from environment variables diff --git a/server/lib/devtoolsproxy/proxy.go b/server/lib/devtoolsproxy/proxy.go index c073fb42..ce803ae6 100644 --- a/server/lib/devtoolsproxy/proxy.go +++ b/server/lib/devtoolsproxy/proxy.go @@ -10,6 +10,7 @@ import ( "net/url" "os/exec" "regexp" + "strconv" "strings" "sync" "sync/atomic" @@ -145,8 +146,8 @@ func (u *UpstreamManager) runTailOnce(ctx context.Context) { // WebSocketProxyHandler returns an http.Handler that upgrades incoming connections and // proxies them to the current upstream websocket URL. It expects only websocket requests. -func WebSocketProxyHandler(mgr *UpstreamManager, logger *slog.Logger) http.Handler { - upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }} +// If logCDPMessages is true, all CDP messages will be logged with their direction. +func WebSocketProxyHandler(mgr *UpstreamManager, logger *slog.Logger, logCDPMessages bool) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { upstreamCurrent := mgr.Current() if upstreamCurrent == "" { @@ -160,17 +161,41 @@ func WebSocketProxyHandler(mgr *UpstreamManager, logger *slog.Logger) http.Handl } // Always use the full upstream path and query, ignoring the client's request path/query upstreamURL := (&url.URL{Scheme: parsed.Scheme, Host: parsed.Host, Path: parsed.Path, RawQuery: parsed.RawQuery}).String() + upgrader := websocket.Upgrader{ + ReadBufferSize: 65536, + WriteBufferSize: 65536, + EnableCompression: true, + CheckOrigin: func(r *http.Request) bool { return true }, + } + logger.Info("upgrader config", slog.Any("upgrader", upgrader)) clientConn, err := upgrader.Upgrade(w, r, nil) if err != nil { logger.Error("websocket upgrade failed", slog.String("err", err.Error())) return } - upstreamConn, _, err := websocket.DefaultDialer.Dial(upstreamURL, nil) + clientConn.SetReadDeadline(time.Time{}) // No timeout--hold on to connections for dear life + clientConn.SetWriteDeadline(time.Time{}) // No timeout--hold on to connections for dear life + clientConn.SetReadLimit(100 * 1024 * 1024) // 100 MB. Effectively no maximum size of message from client + clientConn.EnableWriteCompression(true) + clientConn.SetCompressionLevel(6) + + dialer := websocket.Dialer{ + ReadBufferSize: 65536, + WriteBufferSize: 65536, + HandshakeTimeout: 30 * time.Second, + } + logger.Info("dialer config", slog.Any("dialer", dialer)) + upstreamConn, _, err := dialer.Dial(upstreamURL, nil) if err != nil { logger.Error("dial upstream failed", slog.String("err", err.Error()), slog.String("url", upstreamURL)) _ = clientConn.Close() return } + upstreamConn.SetReadLimit(100 * 1024 * 1024) // 100 MB. Effectively no maximum size of message from upstream + upstreamConn.EnableWriteCompression(true) + upstreamConn.SetCompressionLevel(6) + upstreamConn.SetReadDeadline(time.Time{}) // no timeout + upstreamConn.SetWriteDeadline(time.Time{}) // no timeout logger.Debug("proxying devtools websocket", slog.String("url", upstreamURL)) var once sync.Once @@ -180,7 +205,7 @@ func WebSocketProxyHandler(mgr *UpstreamManager, logger *slog.Logger) http.Handl _ = clientConn.Close() }) } - proxyWebSocket(r.Context(), clientConn, upstreamConn, cleanup, logger) + proxyWebSocket(r.Context(), clientConn, upstreamConn, cleanup, logger, logCDPMessages) }) } @@ -190,47 +215,103 @@ type wsConn interface { Close() error } -func proxyWebSocket(ctx context.Context, clientConn, upstreamConn wsConn, onClose func(), logger *slog.Logger) { - errChan := make(chan error, 2) +// logCDPMessage logs a CDP message with its direction if logging is enabled +func logCDPMessage(logger *slog.Logger, direction string, mt int, msg []byte) { + if mt != websocket.TextMessage { + return // Only log text messages (CDP messages) + } + + // Extract fields using regex from raw message + rawMsg := string(msg) + + // Regex patterns to match "key":"val" or "key": "val" for string values + extractStringField := func(key string) string { + pattern := fmt.Sprintf(`"%s"\s*:\s*"([^"]*)"`, key) + re := regexp.MustCompile(pattern) + matches := re.FindStringSubmatch(rawMsg) + if len(matches) > 1 { + return matches[1] + } + return "" + } + + // Regex pattern to match "key": number for numeric id + extractNumberField := func(key string) interface{} { + pattern := fmt.Sprintf(`"%s"\s*:\s*(\d+)`, key) + re := regexp.MustCompile(pattern) + matches := re.FindStringSubmatch(rawMsg) + if len(matches) > 1 { + // Try to parse as int first + if val, err := strconv.Atoi(matches[1]); err == nil { + return val + } + // Fall back to float64 + if val, err := strconv.ParseFloat(matches[1], 64); err == nil { + return val + } + } + return nil + } + + // Extract fields using regex + method := extractStringField("method") + id := extractNumberField("id") + sessionId := extractStringField("sessionId") + targetId := extractStringField("targetId") + frameId := extractStringField("frameId") + + // Build log attributes, only including non-empty values + attrs := []slog.Attr{ + slog.String("dir", direction), + } + + if sessionId != "" { + attrs = append(attrs, slog.String("sessionId", sessionId)) + } + if targetId != "" { + attrs = append(attrs, slog.String("targetId", targetId)) + } + if id != nil { + attrs = append(attrs, slog.Any("id", id)) + } + if frameId != "" { + attrs = append(attrs, slog.String("frameId", frameId)) + } + + if method != "" { + attrs = append(attrs, slog.String("method", method)) + } + + attrs = append(attrs, slog.Int("raw_length", len(msg))) - // Single-writer guarantee for client connection - var clientWriteMu sync.Mutex + // Convert attrs to individual slog.Attr arguments + args := make([]any, len(attrs)) + for i, attr := range attrs { + args[i] = attr + } - // Heartbeat tracking - var hbMu sync.Mutex - lastClientActivity := time.Now() - var lastPingSent time.Time - var lastPongReceived time.Time - var outstandingPing bool + logger.Info("cdp", args...) +} + +func proxyWebSocket(ctx context.Context, clientConn, upstreamConn wsConn, onClose func(), logger *slog.Logger, logCDPMessages bool) { + errChan := make(chan error, 2) go func() { for { mt, msg, err := clientConn.ReadMessage() if err != nil { + logger.Error("client read error", slog.String("err", err.Error())) errChan <- err break } - // Record any client activity - hbMu.Lock() - lastClientActivity = time.Now() - hbMu.Unlock() - - // Handle control frames from client - if mt == websocket.PongMessage { - hbMu.Lock() - lastPongReceived = time.Now() - outstandingPing = false - hbMu.Unlock() - continue - } - if mt == websocket.PingMessage { - clientWriteMu.Lock() - _ = clientConn.WriteMessage(websocket.PongMessage, nil) - clientWriteMu.Unlock() - continue + + // Log CDP messages if enabled + if logCDPMessages { + logCDPMessage(logger, "->", mt, msg) } if err := upstreamConn.WriteMessage(mt, msg); err != nil { + logger.Error("upstream write error", slog.String("err", err.Error())) errChan <- err break } @@ -240,65 +321,21 @@ func proxyWebSocket(ctx context.Context, clientConn, upstreamConn wsConn, onClos for { mt, msg, err := upstreamConn.ReadMessage() if err != nil { + logger.Error("upstream read error", slog.String("err", err.Error())) errChan <- err break } - clientWriteMu.Lock() + + // Log CDP messages if enabled + if logCDPMessages { + logCDPMessage(logger, "<-", mt, msg) + } + if err := clientConn.WriteMessage(mt, msg); err != nil { - clientWriteMu.Unlock() + logger.Error("client write error", slog.String("err", err.Error())) errChan <- err break } - clientWriteMu.Unlock() - } - }() - - // Heartbeat goroutine - go func() { - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - now := time.Now() - hbMu.Lock() - inactivity := now.Sub(lastClientActivity) - pingOutstanding := outstandingPing - lastPing := lastPingSent - lastPong := lastPongReceived - hbMu.Unlock() - - if pingOutstanding { - if now.Sub(lastPing) > 10*time.Second && lastPong.Before(lastPing) { - logger.Warn("client ping timeout; closing devtools websocket") - select { - case errChan <- fmt.Errorf("ping timeout"): - default: - } - return - } - continue - } - - if inactivity >= 30*time.Second { - clientWriteMu.Lock() - pingErr := clientConn.WriteMessage(websocket.PingMessage, nil) - clientWriteMu.Unlock() - if pingErr != nil { - select { - case errChan <- pingErr: - default: - } - return - } - hbMu.Lock() - lastPingSent = now - outstandingPing = true - hbMu.Unlock() - } - } } }() diff --git a/server/lib/devtoolsproxy/proxy_test.go b/server/lib/devtoolsproxy/proxy_test.go index d2cf8919..3a50ca64 100644 --- a/server/lib/devtoolsproxy/proxy_test.go +++ b/server/lib/devtoolsproxy/proxy_test.go @@ -98,7 +98,7 @@ func TestWebSocketProxyHandler_ProxiesEcho(t *testing.T) { // seed current upstream to echo server including path/query (bypass tailing) mgr.setCurrent((&url.URL{Scheme: u.Scheme, Host: u.Host, Path: u.Path, RawQuery: u.RawQuery}).String()) - proxy := WebSocketProxyHandler(mgr, logger) + proxy := WebSocketProxyHandler(mgr, logger, false) proxySrv := httptest.NewServer(proxy) defer proxySrv.Close() diff --git a/shared/uk-check-stats.sh b/shared/uk-check-stats.sh index fffd9fcc..fd0e2bd0 100755 --- a/shared/uk-check-stats.sh +++ b/shared/uk-check-stats.sh @@ -23,16 +23,57 @@ fi # get instance stats in a loop until ctrl-c trap 'echo "Stopping stats collection..."; exit 0' INT -echo -e "RSS\tCPU Time\tTX Bytes\tNConns\tNReqs\tNQueued\tNTotal" +echo -e "Timestamp\tRSS (MB)\tCPU%\tTX Bytes (MB)\tKB/s\tNConns\tNReqs\tNQueued\tNTotal" + +# Initialize previous values for calculations +prev_cpu_time="" +prev_tx_bytes="" + while true; do + # Get current timestamp with millisecond resolution + timestamp=$(date '+%Y-%m-%d %H:%M:%S.%3N') + metrics=$(curl -s -H "Authorization: Bearer $UKC_TOKEN" "$UKC_METRO/instances/$instance_id/metrics") - rss=$(echo "$metrics" | grep 'instance_rss_bytes{instance_uuid=' | cut -d' ' -f2) + rss_bytes=$(echo "$metrics" | grep 'instance_rss_bytes{instance_uuid=' | cut -d' ' -f2) + rss=$(echo "scale=2; $rss_bytes / 1048576" | bc) cpu_time=$(echo "$metrics" | grep 'instance_cpu_time_s{instance_uuid=' | cut -d' ' -f2) - tx_bytes=$(echo "$metrics" | grep 'instance_tx_bytes{instance_uuid=' | cut -d' ' -f2) + tx_bytes_raw=$(echo "$metrics" | grep 'instance_tx_bytes{instance_uuid=' | cut -d' ' -f2) + tx_bytes=$(echo "scale=2; $tx_bytes_raw / 1048576" | bc) nconns=$(echo "$metrics" | grep 'instance_nconns{instance_uuid=' | cut -d' ' -f2) nreqs=$(echo "$metrics" | grep 'instance_nreqs{instance_uuid=' | cut -d' ' -f2) nqueued=$(echo "$metrics" | grep 'instance_nqueued{instance_uuid=' | cut -d' ' -f2) ntotal=$(echo "$metrics" | grep 'instance_ntotal{instance_uuid=' | cut -d' ' -f2) - echo -e "$rss\t$cpu_time\t$tx_bytes\t$nconns\t$nreqs\t$nqueued\t$ntotal" + + # Calculate CPU percentage (ensure it's >= 0) + if [ -n "$prev_cpu_time" ] && [ -n "$cpu_time" ]; then + cpu_diff=$(echo "scale=6; $cpu_time - $prev_cpu_time" | bc) + cpu_percent=$(echo "scale=2; $cpu_diff * 100" | bc) + # Ensure CPU percentage is not negative + if (( $(echo "$cpu_percent < 0" | bc -l) )); then + cpu_percent="0.00" + fi + # Format to exactly 2 decimal places + cpu_percent=$(printf "%.2f" "$cpu_percent") + else + cpu_percent="0.00" + fi + + # Calculate network speed in KB/s + if [ -n "$prev_tx_bytes" ] && [ -n "$tx_bytes_raw" ]; then + tx_diff=$(echo "scale=6; $tx_bytes_raw - $prev_tx_bytes" | bc) + tx_kbps=$(echo "scale=2; $tx_diff / 1024" | bc) + # Ensure network speed is not negative + if (( $(echo "$tx_kbps < 0" | bc -l) )); then + tx_kbps="0.00" + fi + else + tx_kbps="0.00" + fi + + echo -e "$timestamp\t${rss}MB\t${cpu_percent}%\t${tx_bytes}MB\t${tx_kbps}KB/s\t$nconns\t$nreqs\t$nqueued\t$ntotal" + + # Store current values for next iteration + prev_cpu_time="$cpu_time" + prev_tx_bytes="$tx_bytes_raw" sleep 1 done