From 3f75ef67ae447860010a303201877743d4c2aa8c Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Thu, 12 Mar 2026 15:02:14 +0000 Subject: [PATCH 1/3] fix(backend): stream event logs and reduce memory pressure The backend was loading entire AG-UI event log files into memory via os.ReadFile(), causing OOMKill on UAT with 512Mi limit. - Replace os.ReadFile with bufio.Scanner for streaming event reads - Apply same streaming fix to readJSONLFile in export.go - Add io.LimitReader to error response body reads (capped at 1KB) - Increase session map cleanup frequency (10min -> 2min) - Reduce stale session threshold (1hr -> 10min) - Add K8s-level pagination to ListSessions Fixes: RHOAIENG-52922 Co-Authored-By: Claude Opus 4.6 --- components/backend/handlers/sessions.go | 26 +++++++++++++++++----- components/backend/websocket/agui_proxy.go | 13 ++++++----- components/backend/websocket/agui_store.go | 19 ++++++++++++---- components/backend/websocket/export.go | 12 ++++++---- 4 files changed, 52 insertions(+), 18 deletions(-) mode change 100644 => 100755 components/backend/handlers/sessions.go mode change 100644 => 100755 components/backend/websocket/agui_proxy.go mode change 100644 => 100755 components/backend/websocket/agui_store.go mode change 100644 => 100755 components/backend/websocket/export.go diff --git a/components/backend/handlers/sessions.go b/components/backend/handlers/sessions.go old mode 100644 new mode 100755 index 29bf7f775..92484f571 --- a/components/backend/handlers/sessions.go +++ b/components/backend/handlers/sessions.go @@ -34,6 +34,9 @@ import ( "k8s.io/client-go/kubernetes" ) +// k8sListPageSize bounds per-call memory when listing CRs from the API server. +const k8sListPageSize = 500 + // Package-level variables for session handlers (set from main package) var ( GetAgenticSessionV1Alpha1Resource func() schema.GroupVersionResource @@ -380,21 +383,34 @@ func ListSessions(c *gin.Context) { } types.NormalizePaginationParams(¶ms) - // Build list options with pagination - // Note: Kubernetes List with Limit returns a continue token for server-side pagination - // We use offset-based pagination on top of fetching all items for search/sort flexibility + // Build list options with K8s-level pagination to avoid loading all CRs into memory. + // We still need all items for search/sort, but fetching in pages bounds per-call memory. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - list, err := k8sDyn.Resource(gvr).Namespace(project).List(ctx, v1.ListOptions{}) + listOpts := v1.ListOptions{Limit: k8sListPageSize} + list, err := k8sDyn.Resource(gvr).Namespace(project).List(ctx, listOpts) if err != nil { log.Printf("Failed to list agentic sessions in project %s: %v", project, err) c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to list agentic sessions"}) return } + // Fetch remaining pages if the list was truncated by the Limit. + allItems := list.Items + for list.GetContinue() != "" { + listOpts.Continue = list.GetContinue() + list, err = k8sDyn.Resource(gvr).Namespace(project).List(ctx, listOpts) + if err != nil { + log.Printf("Failed to list agentic sessions (continue) in project %s: %v", project, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to list agentic sessions"}) + return + } + allItems = append(allItems, list.Items...) + } + var sessions []types.AgenticSession - for _, item := range list.Items { + for _, item := range allItems { meta, _, err := unstructured.NestedMap(item.Object, "metadata") if err != nil { log.Printf("ListSessions: failed to read metadata for %s/%s: %v", project, item.GetName(), err) diff --git a/components/backend/websocket/agui_proxy.go b/components/backend/websocket/agui_proxy.go old mode 100644 new mode 100755 index 18a8e211a..b517f7fda --- a/components/backend/websocket/agui_proxy.go +++ b/components/backend/websocket/agui_proxy.go @@ -41,6 +41,9 @@ const ( // activityUpdateTimeout bounds how long a single activity status update can take. activityUpdateTimeout = 10 * time.Second + + // maxErrorBodyBytes caps how much of an error response body we read into memory. + maxErrorBodyBytes = 1024 ) // activityUpdateSem limits concurrent goroutines spawned by updateLastActivityTime. @@ -70,10 +73,10 @@ var sessionLastSeen sync.Map // staleSessionThreshold is the duration after which an inactive session's // cached data is pruned from the in-memory maps. -const staleSessionThreshold = 1 * time.Hour +const staleSessionThreshold = 10 * time.Minute // staleSessionCleanupInterval is how often the cleanup goroutine runs. -const staleSessionCleanupInterval = 10 * time.Minute +const staleSessionCleanupInterval = 2 * time.Minute func init() { go cleanupStaleSessions() @@ -321,7 +324,7 @@ func proxyRunnerStream(runnerURL string, bodyBytes []byte, sessionName, runID, t defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) + body, _ := io.ReadAll(io.LimitReader(resp.Body, maxErrorBodyBytes)) log.Printf("AGUI Proxy: runner returned %d: %s", resp.StatusCode, string(body)) publishAndPersistErrorEvents(sessionName, runID, threadID, fmt.Sprintf("Runner error: HTTP %d", resp.StatusCode)) return @@ -484,7 +487,7 @@ func HandleAGUIInterrupt(c *gin.Context) { defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) + body, _ := io.ReadAll(io.LimitReader(resp.Body, maxErrorBodyBytes)) c.JSON(resp.StatusCode, gin.H{"error": string(body)}) return } @@ -549,7 +552,7 @@ func HandleAGUIFeedback(c *gin.Context) { defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) + body, _ := io.ReadAll(io.LimitReader(resp.Body, maxErrorBodyBytes)) log.Printf("AGUI Feedback: runner returned %d for %s: %s", resp.StatusCode, sessionName, string(body)) c.JSON(resp.StatusCode, gin.H{"error": "Runner rejected feedback", "status": "failed"}) return diff --git a/components/backend/websocket/agui_store.go b/components/backend/websocket/agui_store.go old mode 100644 new mode 100755 index a6ae9deec..a27f0e824 --- a/components/backend/websocket/agui_store.go +++ b/components/backend/websocket/agui_store.go @@ -11,6 +11,8 @@ package websocket import ( "ambient-code-backend/types" + "bufio" + "bytes" "encoding/json" "fmt" "log" @@ -157,13 +159,14 @@ func persistEvent(sessionID string, event map[string]interface{}) { // ─── Read path ─────────────────────────────────────────────────────── -// loadEvents reads all AG-UI events for a session from the JSONL log. +// loadEvents reads all AG-UI events for a session from the JSONL log +// using a streaming scanner to avoid loading the entire file into memory. // Automatically triggers legacy migration if the log doesn't exist but // a pre-AG-UI messages.jsonl file does. func loadEvents(sessionID string) []map[string]interface{} { path := fmt.Sprintf("%s/sessions/%s/agui-events.jsonl", StateBaseDir, sessionID) - data, err := os.ReadFile(path) + f, err := os.Open(path) if err != nil { if os.IsNotExist(err) { // Attempt legacy migration (messages.jsonl → agui-events.jsonl) @@ -171,7 +174,7 @@ func loadEvents(sessionID string) []map[string]interface{} { log.Printf("AGUI Store: legacy migration failed for %s: %v", sessionID, mErr) } // Retry after migration - data, err = os.ReadFile(path) + f, err = os.Open(path) if err != nil { return nil } @@ -180,9 +183,14 @@ func loadEvents(sessionID string) []map[string]interface{} { return nil } } + defer f.Close() events := make([]map[string]interface{}, 0, 64) - for _, line := range splitLines(data) { + scanner := bufio.NewScanner(f) + // Allow lines up to 1MB (default 64KB may truncate large tool outputs) + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + for scanner.Scan() { + line := scanner.Bytes() if len(line) == 0 { continue } @@ -191,6 +199,9 @@ func loadEvents(sessionID string) []map[string]interface{} { events = append(events, evt) } } + if err := scanner.Err(); err != nil { + log.Printf("AGUI Store: error scanning event log for %s: %v", sessionID, err) + } return events } diff --git a/components/backend/websocket/export.go b/components/backend/websocket/export.go old mode 100644 new mode 100755 index 1651359db..4d3284a74 --- a/components/backend/websocket/export.go +++ b/components/backend/websocket/export.go @@ -2,6 +2,7 @@ package websocket import ( + "bufio" "context" "encoding/json" "fmt" @@ -199,15 +200,18 @@ func isValidSessionName(name string) bool { // readJSONLFile reads a JSONL file and returns parsed array of objects func readJSONLFile(path string) ([]map[string]interface{}, error) { - data, err := os.ReadFile(path) + f, err := os.Open(path) if err != nil { return nil, err } + defer f.Close() var events []map[string]interface{} - lines := splitLines(data) + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) - for _, line := range lines { + for scanner.Scan() { + line := scanner.Bytes() if len(line) == 0 { continue } @@ -220,5 +224,5 @@ func readJSONLFile(path string) ([]map[string]interface{}, error) { events = append(events, event) } - return events, nil + return events, scanner.Err() } From 983d27d6d156ad316145208e62abbc6cccc298fe Mon Sep 17 00:00:00 2001 From: "ambient-code[bot]" Date: Thu, 12 Mar 2026 20:21:14 +0000 Subject: [PATCH 2/3] fix: address review feedback and code quality issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Revert aggressive cache timeouts (10min → 1hr) to prevent premature eviction of active sessions - Extract scanner buffer size constants to eliminate magic numbers - Remove unused bytes import - Fix file permissions (755 → 644) The 10-minute staleSessionThreshold would cause active SSE streams to lose their cached runner port, breaking /agui/interrupt and /agui/feedback calls. Co-Authored-By: Claude --- components/backend/handlers/sessions.go | 0 components/backend/websocket/agui_proxy.go | 4 ++-- components/backend/websocket/agui_store.go | 9 +++++++-- components/backend/websocket/export.go | 2 +- 4 files changed, 10 insertions(+), 5 deletions(-) mode change 100755 => 100644 components/backend/handlers/sessions.go mode change 100755 => 100644 components/backend/websocket/agui_proxy.go mode change 100755 => 100644 components/backend/websocket/agui_store.go mode change 100755 => 100644 components/backend/websocket/export.go diff --git a/components/backend/handlers/sessions.go b/components/backend/handlers/sessions.go old mode 100755 new mode 100644 diff --git a/components/backend/websocket/agui_proxy.go b/components/backend/websocket/agui_proxy.go old mode 100755 new mode 100644 index b517f7fda..862c2ba5e --- a/components/backend/websocket/agui_proxy.go +++ b/components/backend/websocket/agui_proxy.go @@ -73,10 +73,10 @@ var sessionLastSeen sync.Map // staleSessionThreshold is the duration after which an inactive session's // cached data is pruned from the in-memory maps. -const staleSessionThreshold = 10 * time.Minute +const staleSessionThreshold = 1 * time.Hour // staleSessionCleanupInterval is how often the cleanup goroutine runs. -const staleSessionCleanupInterval = 2 * time.Minute +const staleSessionCleanupInterval = 10 * time.Minute func init() { go cleanupStaleSessions() diff --git a/components/backend/websocket/agui_store.go b/components/backend/websocket/agui_store.go old mode 100755 new mode 100644 index a27f0e824..18deebfd3 --- a/components/backend/websocket/agui_store.go +++ b/components/backend/websocket/agui_store.go @@ -12,7 +12,6 @@ package websocket import ( "ambient-code-backend/types" "bufio" - "bytes" "encoding/json" "fmt" "log" @@ -55,6 +54,12 @@ func evictStaleWriteMutexes() { // Set from the STATE_BASE_DIR env var (default "/workspace") at startup. var StateBaseDir string +const ( + // Scanner buffer sizes for reading JSONL files + scannerInitialBufferSize = 64 * 1024 // 64KB initial buffer + scannerMaxLineSize = 1024 * 1024 // 1MB max line size +) + // ─── Live event pipe (multi-client broadcast) ─────────────────────── // The run handler pipes raw SSE lines to ALL connect handlers tailing // the same session. Zero latency — same as the direct run() path. @@ -188,7 +193,7 @@ func loadEvents(sessionID string) []map[string]interface{} { events := make([]map[string]interface{}, 0, 64) scanner := bufio.NewScanner(f) // Allow lines up to 1MB (default 64KB may truncate large tool outputs) - scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + scanner.Buffer(make([]byte, 0, scannerInitialBufferSize), scannerMaxLineSize) for scanner.Scan() { line := scanner.Bytes() if len(line) == 0 { diff --git a/components/backend/websocket/export.go b/components/backend/websocket/export.go old mode 100755 new mode 100644 index 4d3284a74..478670ee0 --- a/components/backend/websocket/export.go +++ b/components/backend/websocket/export.go @@ -208,7 +208,7 @@ func readJSONLFile(path string) ([]map[string]interface{}, error) { var events []map[string]interface{} scanner := bufio.NewScanner(f) - scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + scanner.Buffer(make([]byte, 0, scannerInitialBufferSize), scannerMaxLineSize) for scanner.Scan() { line := scanner.Bytes() From a172cdc7b52016154cfda5455cf4a92708f1c1f2 Mon Sep 17 00:00:00 2001 From: Gage Krumbach Date: Thu, 12 Mar 2026 22:42:29 -0500 Subject: [PATCH 3/3] style(backend): fix gofmt alignment in scanner constants Co-Authored-By: Claude Opus 4.6 (1M context) --- components/backend/websocket/agui_store.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/components/backend/websocket/agui_store.go b/components/backend/websocket/agui_store.go index 18deebfd3..548289823 100644 --- a/components/backend/websocket/agui_store.go +++ b/components/backend/websocket/agui_store.go @@ -56,8 +56,8 @@ var StateBaseDir string const ( // Scanner buffer sizes for reading JSONL files - scannerInitialBufferSize = 64 * 1024 // 64KB initial buffer - scannerMaxLineSize = 1024 * 1024 // 1MB max line size + scannerInitialBufferSize = 64 * 1024 // 64KB initial buffer + scannerMaxLineSize = 1024 * 1024 // 1MB max line size ) // ─── Live event pipe (multi-client broadcast) ───────────────────────