Skip to content

Commit 577a24a

Browse files
authored
remove ping (#66)
1 parent 330685e commit 577a24a

File tree

9 files changed

+178
-94
lines changed

9 files changed

+178
-94
lines changed

images/chromium-headful/run-unikernel.sh

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,15 @@ trap 'rm -rf "$FLAGS_DIR"' EXIT
4141

4242

4343
deploy_args=(
44+
--vcpus 4
4445
-M 4096
4546
-p 9222:9222/tls
4647
-p 444:10001/tls
4748
-e DISPLAY_NUM=1
4849
-e HEIGHT=768
4950
-e WIDTH=1024
50-
-e RUN_AS_ROOT="$RUN_AS_ROOT" \
51+
-e RUN_AS_ROOT="$RUN_AS_ROOT"
52+
-e LOG_CDP_MESSAGES=true
5153
-v "$volume_name":/chromium
5254
-n "$NAME"
5355
)

images/chromium-headful/supervisor/services/kernel-images-api.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[program:kernel-images-api]
2-
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" exec /usr/local/bin/kernel-images-api'
2+
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" exec /usr/local/bin/kernel-images-api'
33
autostart=false
44
autorestart=true
55
startsecs=2

images/chromium-headless/image/supervisor/services/kernel-images-api.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[program:kernel-images-api]
2-
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" exec /usr/local/bin/kernel-images-api'
2+
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" exec /usr/local/bin/kernel-images-api'
33
autostart=false
44
autorestart=true
55
startsecs=2

images/chromium-headless/run-unikernel.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@ deploy_args=(
1515
--scale-to-zero idle
1616
--scale-to-zero-cooldown 3000ms
1717
--scale-to-zero-stateful
18+
--vcpus 1
1819
-M 1024
1920
-e RUN_AS_ROOT="$RUN_AS_ROOT"
21+
-e LOG_CDP_MESSAGES=true \
2022
-p 9222:9222/tls
2123
-p 444:10001/tls
22-
--vcpus 2
2324
-n "$NAME"
2425
)
2526

server/cmd/api/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func main() {
141141
})
142142
})
143143
rDevtools.Get("/*", func(w http.ResponseWriter, r *http.Request) {
144-
devtoolsproxy.WebSocketProxyHandler(upstreamMgr, slogger).ServeHTTP(w, r)
144+
devtoolsproxy.WebSocketProxyHandler(upstreamMgr, slogger, config.LogCDPMessages).ServeHTTP(w, r)
145145
})
146146

147147
srvDevtools := &http.Server{

server/cmd/config/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ type Config struct {
1919

2020
// Absolute or relative path to the ffmpeg binary. If empty the code falls back to "ffmpeg" on $PATH.
2121
PathToFFmpeg string `envconfig:"FFMPEG_PATH" default:"ffmpeg"`
22+
23+
// DevTools proxy configuration
24+
LogCDPMessages bool `envconfig:"LOG_CDP_MESSAGES" default:"false"`
2225
}
2326

2427
// Load loads configuration from environment variables

server/lib/devtoolsproxy/proxy.go

Lines changed: 121 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"net/url"
1111
"os/exec"
1212
"regexp"
13+
"strconv"
1314
"strings"
1415
"sync"
1516
"sync/atomic"
@@ -145,8 +146,8 @@ func (u *UpstreamManager) runTailOnce(ctx context.Context) {
145146

146147
// WebSocketProxyHandler returns an http.Handler that upgrades incoming connections and
147148
// proxies them to the current upstream websocket URL. It expects only websocket requests.
148-
func WebSocketProxyHandler(mgr *UpstreamManager, logger *slog.Logger) http.Handler {
149-
upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}
149+
// If logCDPMessages is true, all CDP messages will be logged with their direction.
150+
func WebSocketProxyHandler(mgr *UpstreamManager, logger *slog.Logger, logCDPMessages bool) http.Handler {
150151
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
151152
upstreamCurrent := mgr.Current()
152153
if upstreamCurrent == "" {
@@ -160,17 +161,41 @@ func WebSocketProxyHandler(mgr *UpstreamManager, logger *slog.Logger) http.Handl
160161
}
161162
// Always use the full upstream path and query, ignoring the client's request path/query
162163
upstreamURL := (&url.URL{Scheme: parsed.Scheme, Host: parsed.Host, Path: parsed.Path, RawQuery: parsed.RawQuery}).String()
164+
upgrader := websocket.Upgrader{
165+
ReadBufferSize: 65536,
166+
WriteBufferSize: 65536,
167+
EnableCompression: true,
168+
CheckOrigin: func(r *http.Request) bool { return true },
169+
}
170+
logger.Info("upgrader config", slog.Any("upgrader", upgrader))
163171
clientConn, err := upgrader.Upgrade(w, r, nil)
164172
if err != nil {
165173
logger.Error("websocket upgrade failed", slog.String("err", err.Error()))
166174
return
167175
}
168-
upstreamConn, _, err := websocket.DefaultDialer.Dial(upstreamURL, nil)
176+
clientConn.SetReadDeadline(time.Time{}) // No timeout--hold on to connections for dear life
177+
clientConn.SetWriteDeadline(time.Time{}) // No timeout--hold on to connections for dear life
178+
clientConn.SetReadLimit(100 * 1024 * 1024) // 100 MB. Effectively no maximum size of message from client
179+
clientConn.EnableWriteCompression(true)
180+
clientConn.SetCompressionLevel(6)
181+
182+
dialer := websocket.Dialer{
183+
ReadBufferSize: 65536,
184+
WriteBufferSize: 65536,
185+
HandshakeTimeout: 30 * time.Second,
186+
}
187+
logger.Info("dialer config", slog.Any("dialer", dialer))
188+
upstreamConn, _, err := dialer.Dial(upstreamURL, nil)
169189
if err != nil {
170190
logger.Error("dial upstream failed", slog.String("err", err.Error()), slog.String("url", upstreamURL))
171191
_ = clientConn.Close()
172192
return
173193
}
194+
upstreamConn.SetReadLimit(100 * 1024 * 1024) // 100 MB. Effectively no maximum size of message from upstream
195+
upstreamConn.EnableWriteCompression(true)
196+
upstreamConn.SetCompressionLevel(6)
197+
upstreamConn.SetReadDeadline(time.Time{}) // no timeout
198+
upstreamConn.SetWriteDeadline(time.Time{}) // no timeout
174199
logger.Debug("proxying devtools websocket", slog.String("url", upstreamURL))
175200

176201
var once sync.Once
@@ -180,7 +205,7 @@ func WebSocketProxyHandler(mgr *UpstreamManager, logger *slog.Logger) http.Handl
180205
_ = clientConn.Close()
181206
})
182207
}
183-
proxyWebSocket(r.Context(), clientConn, upstreamConn, cleanup, logger)
208+
proxyWebSocket(r.Context(), clientConn, upstreamConn, cleanup, logger, logCDPMessages)
184209
})
185210
}
186211

@@ -190,47 +215,103 @@ type wsConn interface {
190215
Close() error
191216
}
192217

193-
func proxyWebSocket(ctx context.Context, clientConn, upstreamConn wsConn, onClose func(), logger *slog.Logger) {
194-
errChan := make(chan error, 2)
218+
// logCDPMessage logs a CDP message with its direction if logging is enabled
219+
func logCDPMessage(logger *slog.Logger, direction string, mt int, msg []byte) {
220+
if mt != websocket.TextMessage {
221+
return // Only log text messages (CDP messages)
222+
}
223+
224+
// Extract fields using regex from raw message
225+
rawMsg := string(msg)
226+
227+
// Regex patterns to match "key":"val" or "key": "val" for string values
228+
extractStringField := func(key string) string {
229+
pattern := fmt.Sprintf(`"%s"\s*:\s*"([^"]*)"`, key)
230+
re := regexp.MustCompile(pattern)
231+
matches := re.FindStringSubmatch(rawMsg)
232+
if len(matches) > 1 {
233+
return matches[1]
234+
}
235+
return ""
236+
}
237+
238+
// Regex pattern to match "key": number for numeric id
239+
extractNumberField := func(key string) interface{} {
240+
pattern := fmt.Sprintf(`"%s"\s*:\s*(\d+)`, key)
241+
re := regexp.MustCompile(pattern)
242+
matches := re.FindStringSubmatch(rawMsg)
243+
if len(matches) > 1 {
244+
// Try to parse as int first
245+
if val, err := strconv.Atoi(matches[1]); err == nil {
246+
return val
247+
}
248+
// Fall back to float64
249+
if val, err := strconv.ParseFloat(matches[1], 64); err == nil {
250+
return val
251+
}
252+
}
253+
return nil
254+
}
255+
256+
// Extract fields using regex
257+
method := extractStringField("method")
258+
id := extractNumberField("id")
259+
sessionId := extractStringField("sessionId")
260+
targetId := extractStringField("targetId")
261+
frameId := extractStringField("frameId")
262+
263+
// Build log attributes, only including non-empty values
264+
attrs := []slog.Attr{
265+
slog.String("dir", direction),
266+
}
267+
268+
if sessionId != "" {
269+
attrs = append(attrs, slog.String("sessionId", sessionId))
270+
}
271+
if targetId != "" {
272+
attrs = append(attrs, slog.String("targetId", targetId))
273+
}
274+
if id != nil {
275+
attrs = append(attrs, slog.Any("id", id))
276+
}
277+
if frameId != "" {
278+
attrs = append(attrs, slog.String("frameId", frameId))
279+
}
280+
281+
if method != "" {
282+
attrs = append(attrs, slog.String("method", method))
283+
}
284+
285+
attrs = append(attrs, slog.Int("raw_length", len(msg)))
195286

196-
// Single-writer guarantee for client connection
197-
var clientWriteMu sync.Mutex
287+
// Convert attrs to individual slog.Attr arguments
288+
args := make([]any, len(attrs))
289+
for i, attr := range attrs {
290+
args[i] = attr
291+
}
198292

199-
// Heartbeat tracking
200-
var hbMu sync.Mutex
201-
lastClientActivity := time.Now()
202-
var lastPingSent time.Time
203-
var lastPongReceived time.Time
204-
var outstandingPing bool
293+
logger.Info("cdp", args...)
294+
}
295+
296+
func proxyWebSocket(ctx context.Context, clientConn, upstreamConn wsConn, onClose func(), logger *slog.Logger, logCDPMessages bool) {
297+
errChan := make(chan error, 2)
205298

206299
go func() {
207300
for {
208301
mt, msg, err := clientConn.ReadMessage()
209302
if err != nil {
303+
logger.Error("client read error", slog.String("err", err.Error()))
210304
errChan <- err
211305
break
212306
}
213-
// Record any client activity
214-
hbMu.Lock()
215-
lastClientActivity = time.Now()
216-
hbMu.Unlock()
217-
218-
// Handle control frames from client
219-
if mt == websocket.PongMessage {
220-
hbMu.Lock()
221-
lastPongReceived = time.Now()
222-
outstandingPing = false
223-
hbMu.Unlock()
224-
continue
225-
}
226-
if mt == websocket.PingMessage {
227-
clientWriteMu.Lock()
228-
_ = clientConn.WriteMessage(websocket.PongMessage, nil)
229-
clientWriteMu.Unlock()
230-
continue
307+
308+
// Log CDP messages if enabled
309+
if logCDPMessages {
310+
logCDPMessage(logger, "->", mt, msg)
231311
}
232312

233313
if err := upstreamConn.WriteMessage(mt, msg); err != nil {
314+
logger.Error("upstream write error", slog.String("err", err.Error()))
234315
errChan <- err
235316
break
236317
}
@@ -240,65 +321,21 @@ func proxyWebSocket(ctx context.Context, clientConn, upstreamConn wsConn, onClos
240321
for {
241322
mt, msg, err := upstreamConn.ReadMessage()
242323
if err != nil {
324+
logger.Error("upstream read error", slog.String("err", err.Error()))
243325
errChan <- err
244326
break
245327
}
246-
clientWriteMu.Lock()
328+
329+
// Log CDP messages if enabled
330+
if logCDPMessages {
331+
logCDPMessage(logger, "<-", mt, msg)
332+
}
333+
247334
if err := clientConn.WriteMessage(mt, msg); err != nil {
248-
clientWriteMu.Unlock()
335+
logger.Error("client write error", slog.String("err", err.Error()))
249336
errChan <- err
250337
break
251338
}
252-
clientWriteMu.Unlock()
253-
}
254-
}()
255-
256-
// Heartbeat goroutine
257-
go func() {
258-
ticker := time.NewTicker(5 * time.Second)
259-
defer ticker.Stop()
260-
for {
261-
select {
262-
case <-ctx.Done():
263-
return
264-
case <-ticker.C:
265-
now := time.Now()
266-
hbMu.Lock()
267-
inactivity := now.Sub(lastClientActivity)
268-
pingOutstanding := outstandingPing
269-
lastPing := lastPingSent
270-
lastPong := lastPongReceived
271-
hbMu.Unlock()
272-
273-
if pingOutstanding {
274-
if now.Sub(lastPing) > 10*time.Second && lastPong.Before(lastPing) {
275-
logger.Warn("client ping timeout; closing devtools websocket")
276-
select {
277-
case errChan <- fmt.Errorf("ping timeout"):
278-
default:
279-
}
280-
return
281-
}
282-
continue
283-
}
284-
285-
if inactivity >= 30*time.Second {
286-
clientWriteMu.Lock()
287-
pingErr := clientConn.WriteMessage(websocket.PingMessage, nil)
288-
clientWriteMu.Unlock()
289-
if pingErr != nil {
290-
select {
291-
case errChan <- pingErr:
292-
default:
293-
}
294-
return
295-
}
296-
hbMu.Lock()
297-
lastPingSent = now
298-
outstandingPing = true
299-
hbMu.Unlock()
300-
}
301-
}
302339
}
303340
}()
304341

server/lib/devtoolsproxy/proxy_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func TestWebSocketProxyHandler_ProxiesEcho(t *testing.T) {
9898
// seed current upstream to echo server including path/query (bypass tailing)
9999
mgr.setCurrent((&url.URL{Scheme: u.Scheme, Host: u.Host, Path: u.Path, RawQuery: u.RawQuery}).String())
100100

101-
proxy := WebSocketProxyHandler(mgr, logger)
101+
proxy := WebSocketProxyHandler(mgr, logger, false)
102102
proxySrv := httptest.NewServer(proxy)
103103
defer proxySrv.Close()
104104

0 commit comments

Comments
 (0)