fix(backend): stream event logs to prevent OOMKill#890
Conversation
WalkthroughIntroduce memory-usage safeguards: paginate Kubernetes session listings, cap error-response body reads, and stream large JSONL/event files line-by-line with retry-on-migration for missing files. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
components/backend/websocket/agui_store.go (1)
162-205:⚠️ Potential issue | 🟠 Major
loadEventsstill materializes the entire event log into memory, and now fails silently on large individual events.The change from
os.ReadFiletobufio.Scanneravoids one buffer allocation but doesn't address the core issue: every decoded event map is appended to theeventsslice before being returned and replayed. For long-running sessions, this still causes the same OOM scenario. Additionally, if any single JSONL row exceeds the 1 MiB scanner buffer,Scan()returnsfalsewithbufio.ErrTooLong, the loop exits, andHandleAGUIEventsreturns a partial event history to the client with no indication of truncation. To fix both issues, the replay logic needs to stream events without first building the complete slice in memory.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/backend/websocket/agui_store.go` around lines 162 - 205, loadEvents currently reads and decodes every JSONL event into an in-memory slice and can silently fail on lines >1MiB; change it to stream-decoded events instead of materializing them: replace loadEvents(sessionID string) []map[string]interface{} with a streaming variant used by HandleAGUIEvents such as loadEvents(sessionID string, emit func(map[string]interface{}) error) error (or return a receive-only channel), keep the existing migration logic (MigrateLegacySessionToAGUI) and file/open retry, then inside the scanner loop unmarshal each line and immediately call emit(evt) (handling and propagating errors from emit), and explicitly detect scanner.Err() and bufio.ErrTooLong to log/return a clear error instead of returning a truncated partial slice.components/backend/websocket/export.go (1)
202-227:⚠️ Potential issue | 🟠 MajorThis still materializes large exports in memory and caps individual JSONL lines at 1 MiB.
Although file reading is now streaming via
bufio.Scanner, the entire decoded dataset is accumulated into theeventsslice (line 224), then fully re-marshaled withjson.MarshalIndent(lines 120, 144) before being sent to the client. This approach fails to address the original peak-memory concern for large session exports. Additionally, any single JSONL line exceeding 1 MiB will cause the scanner to abort withErrTooLong(line 211 sets the hard limit), making large events unexportable. To handle large exports safely, stream the JSON array directly toc.Writerwithout buffering the full decoded dataset or imposing per-line size limits.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/backend/websocket/export.go` around lines 202 - 227, The current readJSONLFile function plus subsequent json.MarshalIndent calls (used earlier when writing to c.Writer) accumulate the whole export in memory and cap per-line size via Scanner.Buffer; instead, remove readJSONLFile usage and stream the JSONL file directly to c.Writer: open the file, set response headers, write the '[' then iterate over lines using a reader that doesn't impose a 1MiB scanner limit (e.g., bufio.Reader.ReadBytes('\n') or io.Reader with json.Decoder/streaming), for each line validate or wrap as json.RawMessage and write comma-separated entries directly to c.Writer (flushing if using a streaming context), then write ']' and handle errors—this ensures you never json.MarshalIndent the full events slice and avoids Scanner's ErrTooLong while keeping the export streamed to c.Writer.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/backend/websocket/agui_proxy.go`:
- Around line 76-79: The staleSessionThreshold of 10 minutes causes live
sessions to be evicted because sessionLastSeen is only updated when
HandleAGUIEvents and HandleAGUIRunProxy start handling a request; fix by
ensuring active streams keep the session alive: either raise
staleSessionThreshold to a much larger value (e.g., hours) or, preferably,
update sessionLastSeen periodically while SSE/runner streams are active inside
HandleAGUIEvents and HandleAGUIRunProxy (e.g., touch sessionLastSeen on each
send/heartbeat or in the stream loop), so cleanup won’t evict cached runner
ports and calls that rely on handlers.DefaultRunnerPort fallback won’t break
non-default runners.
---
Outside diff comments:
In `@components/backend/websocket/agui_store.go`:
- Around line 162-205: loadEvents currently reads and decodes every JSONL event
into an in-memory slice and can silently fail on lines >1MiB; change it to
stream-decoded events instead of materializing them: replace
loadEvents(sessionID string) []map[string]interface{} with a streaming variant
used by HandleAGUIEvents such as loadEvents(sessionID string, emit
func(map[string]interface{}) error) error (or return a receive-only channel),
keep the existing migration logic (MigrateLegacySessionToAGUI) and file/open
retry, then inside the scanner loop unmarshal each line and immediately call
emit(evt) (handling and propagating errors from emit), and explicitly detect
scanner.Err() and bufio.ErrTooLong to log/return a clear error instead of
returning a truncated partial slice.
In `@components/backend/websocket/export.go`:
- Around line 202-227: The current readJSONLFile function plus subsequent
json.MarshalIndent calls (used earlier when writing to c.Writer) accumulate the
whole export in memory and cap per-line size via Scanner.Buffer; instead, remove
readJSONLFile usage and stream the JSONL file directly to c.Writer: open the
file, set response headers, write the '[' then iterate over lines using a reader
that doesn't impose a 1MiB scanner limit (e.g., bufio.Reader.ReadBytes('\n') or
io.Reader with json.Decoder/streaming), for each line validate or wrap as
json.RawMessage and write comma-separated entries directly to c.Writer (flushing
if using a streaming context), then write ']' and handle errors—this ensures you
never json.MarshalIndent the full events slice and avoids Scanner's ErrTooLong
while keeping the export streamed to c.Writer.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 9a3f8d16-70c8-41f0-acc3-373af6ae6499
📒 Files selected for processing (4)
components/backend/handlers/sessions.gocomponents/backend/websocket/agui_proxy.gocomponents/backend/websocket/agui_store.gocomponents/backend/websocket/export.go
| const staleSessionThreshold = 10 * time.Minute | ||
|
|
||
| // staleSessionCleanupInterval is how often the cleanup goroutine runs. | ||
| const staleSessionCleanupInterval = 10 * time.Minute | ||
| const staleSessionCleanupInterval = 2 * time.Minute |
There was a problem hiding this comment.
Don't expire live session caches after 10 minutes.
sessionLastSeen is only refreshed when HandleAGUIEvents and HandleAGUIRunProxy start handling a request (Lines 141 and 266), not while an SSE stream or runner stream is still active. With a 10-minute threshold, cleanup can evict the cached runner port for a session that is still alive, so later /agui/interrupt, /agui/feedback, and similar calls fall back to handlers.DefaultRunnerPort and break non-default runners.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@components/backend/websocket/agui_proxy.go` around lines 76 - 79, The
staleSessionThreshold of 10 minutes causes live sessions to be evicted because
sessionLastSeen is only updated when HandleAGUIEvents and HandleAGUIRunProxy
start handling a request; fix by ensuring active streams keep the session alive:
either raise staleSessionThreshold to a much larger value (e.g., hours) or,
preferably, update sessionLastSeen periodically while SSE/runner streams are
active inside HandleAGUIEvents and HandleAGUIRunProxy (e.g., touch
sessionLastSeen on each send/heartbeat or in the stream loop), so cleanup won’t
evict cached runner ports and calls that rely on handlers.DefaultRunnerPort
fallback won’t break non-default runners.
There was a problem hiding this comment.
Fixed — reverted staleSessionThreshold back to 1 hour and staleSessionCleanupInterval back to 10 minutes. You're correct that the 10-minute threshold would cause active SSE streams to lose their cached runner port mapping, breaking /agui/interrupt and /agui/feedback for non-default runners.
The backend was loading entire AG-UI event log files into memory via os.ReadFile(), causing OOMKill on UAT with 512Mi limit. - Replace os.ReadFile with bufio.Scanner for streaming event reads - Apply same streaming fix to readJSONLFile in export.go - Add io.LimitReader to error response body reads (capped at 1KB) - Increase session map cleanup frequency (10min -> 2min) - Reduce stale session threshold (1hr -> 10min) - Add K8s-level pagination to ListSessions Fixes: RHOAIENG-52922 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Revert aggressive cache timeouts (10min → 1hr) to prevent premature eviction of active sessions - Extract scanner buffer size constants to eliminate magic numbers - Remove unused bytes import - Fix file permissions (755 → 644) The 10-minute staleSessionThreshold would cause active SSE streams to lose their cached runner port, breaking /agui/interrupt and /agui/feedback calls. Co-Authored-By: Claude <noreply@anthropic.com>
3a62f0d to
983d27d
Compare
Fix Report✅ Resolved Merge Conflicts
📝 Addressed Review FeedbackFixed (1 issue):
Skipped (2 issues):
🔧 Code Quality Improvements
✅ CI Status
📦 Commits Pushed
Status: Ready for review. The original memory optimization changes are intact, with the cache timeout bug fixed and code quality improved. |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/backend/websocket/export.go (1)
203-227:⚠️ Potential issue | 🟠 MajorThe 1MB scanner cap will break exports for large events.
bufio.Scannerenforces a hard per-line token limit of 1MB. When an event exceeds this size,scanner.Scan()fails andscanner.Err()returns a token-too-long error. For AG-UI events this propagates as a 500 error response, breaking the entire export even though the file is otherwise valid. A reader-based loop usingReadBytes('\n')orReadString('\n')preserves streaming behavior without introducing a hard per-line ceiling.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/backend/websocket/export.go` around lines 203 - 227, The current bufio.Scanner loop (scanner.Scan()/scanner.Err(), scanner.Buffer(...)) will fail on lines >1MB; replace the scanner-based loop with a bufio.Reader read-loop using ReadBytes('\n') or ReadString('\n') on the opened file (f) to stream lines without the 1MB cap, keep the same behavior: skip empty lines, json.Unmarshal each line into the existing var events []map[string]interface{}, log and continue on malformed JSON, and return events plus any non-EOF read error; remove scanner.Buffer usage and references to scanner.Scan()/scanner.Err().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/backend/handlers/sessions.go`:
- Around line 386-410: The handler currently accumulates every page into
allItems then copies into sessions, causing O(total sessions) peak memory;
change the paging loop in the sessions listing logic (where list, listOpts,
allItems are used) to append each page's list.Items directly into the final
sessions slice as you fetch them (handle the initial list.Items then for loop on
list.GetContinue() appending list.Items each iteration) instead of buffering
allItems, and remove the intermediate allItems copy so memory grows only as
sessions are built per-page; keep the same error handling around
k8sDyn.Resource(gvr).Namespace(project).List calls and reuse the existing
ctx/cancel.
In `@components/backend/websocket/agui_store.go`:
- Around line 14-15: Run gofmt (and optionally goimports) on the agui_store.go
source to fix formatting issues in the import block and the surrounding code;
specifically reformat the import block that currently lists "bufio" and
"encoding/json" and any code around the region that triggered the lint (the area
noted in the review) so the file passes gofmt checks and CI.
- Around line 57-61: The scanner's 1MB max line size (scannerMaxLineSize) can
cause Scanner.Scan() to stop and produce scanner.Err(), but loadEvents()
currently swallows that and returns a partial events slice; change loadEvents to
return (events []Event, err error) and, when scanner.Err() != nil, return the
error instead of the partial slice; update the caller in agui_proxy.go (the code
at the call site around line 156) to handle the error and fail the request (or
propagate it) instead of using truncated events; keep the scanner constants but
ensure error propagation so oversized lines are detected and handled.
---
Outside diff comments:
In `@components/backend/websocket/export.go`:
- Around line 203-227: The current bufio.Scanner loop
(scanner.Scan()/scanner.Err(), scanner.Buffer(...)) will fail on lines >1MB;
replace the scanner-based loop with a bufio.Reader read-loop using
ReadBytes('\n') or ReadString('\n') on the opened file (f) to stream lines
without the 1MB cap, keep the same behavior: skip empty lines, json.Unmarshal
each line into the existing var events []map[string]interface{}, log and
continue on malformed JSON, and return events plus any non-EOF read error;
remove scanner.Buffer usage and references to scanner.Scan()/scanner.Err().
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 5f8481c4-9af4-4003-98e0-cc7235d7202d
📒 Files selected for processing (4)
components/backend/handlers/sessions.gocomponents/backend/websocket/agui_proxy.gocomponents/backend/websocket/agui_store.gocomponents/backend/websocket/export.go
| // Build list options with K8s-level pagination to avoid loading all CRs into memory. | ||
| // We still need all items for search/sort, but fetching in pages bounds per-call memory. | ||
| ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | ||
| defer cancel() | ||
|
|
||
| list, err := k8sDyn.Resource(gvr).Namespace(project).List(ctx, v1.ListOptions{}) | ||
| listOpts := v1.ListOptions{Limit: k8sListPageSize} | ||
| list, err := k8sDyn.Resource(gvr).Namespace(project).List(ctx, listOpts) | ||
| if err != nil { | ||
| log.Printf("Failed to list agentic sessions in project %s: %v", project, err) | ||
| c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to list agentic sessions"}) | ||
| return | ||
| } | ||
|
|
||
| // Fetch remaining pages if the list was truncated by the Limit. | ||
| allItems := list.Items | ||
| for list.GetContinue() != "" { | ||
| listOpts.Continue = list.GetContinue() | ||
| list, err = k8sDyn.Resource(gvr).Namespace(project).List(ctx, listOpts) | ||
| if err != nil { | ||
| log.Printf("Failed to list agentic sessions (continue) in project %s: %v", project, err) | ||
| c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to list agentic sessions"}) | ||
| return | ||
| } | ||
| allItems = append(allItems, list.Items...) | ||
| } |
There was a problem hiding this comment.
This still materializes the full session set before paginating.
allItems accumulates every page and the next loop copies them again into sessions, so peak memory is still O(total sessions). In a large namespace this endpoint can still hit the same memory ceiling despite the K8s Limit. At minimum, build sessions page-by-page instead of buffering allItems; longer term, this API needs server-side or cursor-based pagination if the request is supposed to stay bounded.
As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@components/backend/handlers/sessions.go` around lines 386 - 410, The handler
currently accumulates every page into allItems then copies into sessions,
causing O(total sessions) peak memory; change the paging loop in the sessions
listing logic (where list, listOpts, allItems are used) to append each page's
list.Items directly into the final sessions slice as you fetch them (handle the
initial list.Items then for loop on list.GetContinue() appending list.Items each
iteration) instead of buffering allItems, and remove the intermediate allItems
copy so memory grows only as sessions are built per-page; keep the same error
handling around k8sDyn.Resource(gvr).Namespace(project).List calls and reuse the
existing ctx/cancel.
| "bufio" | ||
| "encoding/json" |
There was a problem hiding this comment.
Run gofmt on this file before merge.
The lint job is already red for websocket/agui_store.go, so this PR will not pass CI as-is.
As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."
Also applies to: 57-61
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@components/backend/websocket/agui_store.go` around lines 14 - 15, Run gofmt
(and optionally goimports) on the agui_store.go source to fix formatting issues
in the import block and the surrounding code; specifically reformat the import
block that currently lists "bufio" and "encoding/json" and any code around the
region that triggered the lint (the area noted in the review) so the file passes
gofmt checks and CI.
| const ( | ||
| // Scanner buffer sizes for reading JSONL files | ||
| scannerInitialBufferSize = 64 * 1024 // 64KB initial buffer | ||
| scannerMaxLineSize = 1024 * 1024 // 1MB max line size | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
rg -n -C2 'scannerMaxLineSize|bufio.NewScanner|scanner.Buffer|scanner.Err|loadEvents\(' \
components/backend/websocket/agui_store.go \
components/backend/websocket/agui_proxy.goRepository: ambient-code/platform
Length of output: 2700
🏁 Script executed:
# Get the complete loadEvents function implementation
sed -n '171,210p' components/backend/websocket/agui_store.goRepository: ambient-code/platform
Length of output: 1303
🏁 Script executed:
# Check the context around the loadEvents call in agui_proxy.go
sed -n '145,170p' components/backend/websocket/agui_proxy.goRepository: ambient-code/platform
Length of output: 1000
🏁 Script executed:
# Look for any downstream validation or error checking on the returned events
rg -n -A10 'events := loadEvents' components/backend/websocket/agui_proxy.goRepository: ambient-code/platform
Length of output: 383
The scanner's 1MB line limit can silently truncate event replay without signaling failure.
When any JSONL line exceeds 1MB, Scanner.Scan() stops and scanner.Err() is triggered. However, loadEvents() only logs this error and returns the partial events slice already parsed. The caller in agui_proxy.go (line 156) has no way to detect incomplete parsing and treats the truncated slice as the complete replay history, causing clients to receive corrupted session state. Either switch to a reader-based loop with proper error propagation, or add an error return type to loadEvents() and fail the request visibly on scan errors.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@components/backend/websocket/agui_store.go` around lines 57 - 61, The
scanner's 1MB max line size (scannerMaxLineSize) can cause Scanner.Scan() to
stop and produce scanner.Err(), but loadEvents() currently swallows that and
returns a partial events slice; change loadEvents to return (events []Event, err
error) and, when scanner.Err() != nil, return the error instead of the partial
slice; update the caller in agui_proxy.go (the code at the call site around line
156) to handle the error and fail the request (or propagate it) instead of using
truncated events; keep the scanner constants but ensure error propagation so
oversized lines are detected and handled.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
♻️ Duplicate comments (1)
components/backend/websocket/agui_store.go (1)
171-210:⚠️ Potential issue | 🟠 MajorPropagate scan failures instead of returning partial replay data.
At Line 207, scan errors are only logged, and Line 210 still returns partial
events. That can silently replay corrupted/incomplete history to reconnecting clients. Return an error fromloadEventsand make the caller fail the request (or surface the error) instead of using truncated data.Suggested fix
-func loadEvents(sessionID string) []map[string]interface{} { +func loadEvents(sessionID string) ([]map[string]interface{}, error) { path := fmt.Sprintf("%s/sessions/%s/agui-events.jsonl", StateBaseDir, sessionID) f, err := os.Open(path) if err != nil { if os.IsNotExist(err) { // Attempt legacy migration (messages.jsonl → agui-events.jsonl) if mErr := MigrateLegacySessionToAGUI(sessionID); mErr != nil { log.Printf("AGUI Store: legacy migration failed for %s: %v", sessionID, mErr) } // Retry after migration f, err = os.Open(path) if err != nil { - return nil + return nil, nil } } else { log.Printf("AGUI Store: failed to read event log for %s: %v", sessionID, err) - return nil + return nil, err } } defer f.Close() events := make([]map[string]interface{}, 0, 64) scanner := bufio.NewScanner(f) scanner.Buffer(make([]byte, 0, scannerInitialBufferSize), scannerMaxLineSize) for scanner.Scan() { line := scanner.Bytes() if len(line) == 0 { continue } var evt map[string]interface{} if err := json.Unmarshal(line, &evt); err == nil { events = append(events, evt) } } if err := scanner.Err(); err != nil { - log.Printf("AGUI Store: error scanning event log for %s: %v", sessionID, err) + return nil, fmt.Errorf("scan event log for %s: %w", sessionID, err) } - return events + return events, nil }Use this to verify the current silent-partial behavior and impacted call site:
#!/bin/bash set -euo pipefail # 1) Confirm signature and scanner error handling in agui_store.go rg -n -C3 'func loadEvents\(|scanner\.Err\(\)|return events' components/backend/websocket/agui_store.go # 2) Confirm caller currently cannot receive/handle a load error rg -n -C3 'loadEvents\(' components/backend/websocket/agui_proxy.goExpected result:
loadEventshas noerrorreturn, scanner errors are logged-only, and caller treats returned slice as authoritative replay.
As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/backend/websocket/agui_store.go` around lines 171 - 210, The loadEvents function currently swallows scanner errors and returns partial event slices; change loadEvents(sessionID string) to return ([]map[string]interface{}, error), return an error whenever bufio.Scanner reports Err() (and also when json.Unmarshal fails for any non-empty line) instead of appending silently, and update any callers (e.g., places that call loadEvents in agui_proxy.go) to handle the error path and fail/surface the request rather than using truncated data; keep the existing legacy migration call to MigrateLegacySessionToAGUI but propagate and return any subsequent file/open/scan/json errors to callers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@components/backend/websocket/agui_store.go`:
- Around line 171-210: The loadEvents function currently swallows scanner errors
and returns partial event slices; change loadEvents(sessionID string) to return
([]map[string]interface{}, error), return an error whenever bufio.Scanner
reports Err() (and also when json.Unmarshal fails for any non-empty line)
instead of appending silently, and update any callers (e.g., places that call
loadEvents in agui_proxy.go) to handle the error path and fail/surface the
request rather than using truncated data; keep the existing legacy migration
call to MigrateLegacySessionToAGUI but propagate and return any subsequent
file/open/scan/json errors to callers.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: ce2ccf8d-2c9a-4c68-b634-b5785e8c42bc
📒 Files selected for processing (1)
components/backend/websocket/agui_store.go
Summary
bufio.Scannerinstead ofos.ReadFileto avoid loading entire JSONL files into memoryreadJSONLFileinexport.goio.LimitReaderListSessionsRoot Cause
loadEvents()inagui_store.gousedos.ReadFile()to load entire event log files into memory. For long-running sessions, these.jsonlfiles grow to hundreds of MB. EachGET /agui/eventsrequest loaded the full file, causing the backend pod to OOMKill at the 512Mi limit.Test plan
gofmt— cleango vet ./...— cleango build ./...— cleango test ./websocket/...— passinggo test ./handlers/...— passing (1 pre-existing failure unrelated to changes)Fixes: RHOAIENG-52922
Jira: RHOAIENG-52922