fix(backend): stream event logs to prevent OOMKill#890
fix(backend): stream event logs to prevent OOMKill#890Gkrumbach07 wants to merge 1 commit intomainfrom
Conversation
The backend was loading entire AG-UI event log files into memory via os.ReadFile(), causing OOMKill on UAT with 512Mi limit. - Replace os.ReadFile with bufio.Scanner for streaming event reads - Apply same streaming fix to readJSONLFile in export.go - Add io.LimitReader to error response body reads (capped at 1KB) - Increase session map cleanup frequency (10min -> 2min) - Reduce stale session threshold (1hr -> 10min) - Add K8s-level pagination to ListSessions Fixes: RHOAIENG-52922 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
WalkthroughThe changes optimize memory usage across backend components: introducing pagination for Kubernetes API calls in session listing, converting file reads to streaming approaches with buffers in event loading and JSONL parsing, and tightening session staleness detection with adjusted thresholds and cleanup intervals. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
components/backend/websocket/agui_store.go (1)
162-205:⚠️ Potential issue | 🟠 Major
loadEventsstill materializes the entire event log into memory, and now fails silently on large individual events.The change from
os.ReadFiletobufio.Scanneravoids one buffer allocation but doesn't address the core issue: every decoded event map is appended to theeventsslice before being returned and replayed. For long-running sessions, this still causes the same OOM scenario. Additionally, if any single JSONL row exceeds the 1 MiB scanner buffer,Scan()returnsfalsewithbufio.ErrTooLong, the loop exits, andHandleAGUIEventsreturns a partial event history to the client with no indication of truncation. To fix both issues, the replay logic needs to stream events without first building the complete slice in memory.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/backend/websocket/agui_store.go` around lines 162 - 205, loadEvents currently reads and decodes every JSONL event into an in-memory slice and can silently fail on lines >1MiB; change it to stream-decoded events instead of materializing them: replace loadEvents(sessionID string) []map[string]interface{} with a streaming variant used by HandleAGUIEvents such as loadEvents(sessionID string, emit func(map[string]interface{}) error) error (or return a receive-only channel), keep the existing migration logic (MigrateLegacySessionToAGUI) and file/open retry, then inside the scanner loop unmarshal each line and immediately call emit(evt) (handling and propagating errors from emit), and explicitly detect scanner.Err() and bufio.ErrTooLong to log/return a clear error instead of returning a truncated partial slice.components/backend/websocket/export.go (1)
202-227:⚠️ Potential issue | 🟠 MajorThis still materializes large exports in memory and caps individual JSONL lines at 1 MiB.
Although file reading is now streaming via
bufio.Scanner, the entire decoded dataset is accumulated into theeventsslice (line 224), then fully re-marshaled withjson.MarshalIndent(lines 120, 144) before being sent to the client. This approach fails to address the original peak-memory concern for large session exports. Additionally, any single JSONL line exceeding 1 MiB will cause the scanner to abort withErrTooLong(line 211 sets the hard limit), making large events unexportable. To handle large exports safely, stream the JSON array directly toc.Writerwithout buffering the full decoded dataset or imposing per-line size limits.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/backend/websocket/export.go` around lines 202 - 227, The current readJSONLFile function plus subsequent json.MarshalIndent calls (used earlier when writing to c.Writer) accumulate the whole export in memory and cap per-line size via Scanner.Buffer; instead, remove readJSONLFile usage and stream the JSONL file directly to c.Writer: open the file, set response headers, write the '[' then iterate over lines using a reader that doesn't impose a 1MiB scanner limit (e.g., bufio.Reader.ReadBytes('\n') or io.Reader with json.Decoder/streaming), for each line validate or wrap as json.RawMessage and write comma-separated entries directly to c.Writer (flushing if using a streaming context), then write ']' and handle errors—this ensures you never json.MarshalIndent the full events slice and avoids Scanner's ErrTooLong while keeping the export streamed to c.Writer.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/backend/websocket/agui_proxy.go`:
- Around line 76-79: The staleSessionThreshold of 10 minutes causes live
sessions to be evicted because sessionLastSeen is only updated when
HandleAGUIEvents and HandleAGUIRunProxy start handling a request; fix by
ensuring active streams keep the session alive: either raise
staleSessionThreshold to a much larger value (e.g., hours) or, preferably,
update sessionLastSeen periodically while SSE/runner streams are active inside
HandleAGUIEvents and HandleAGUIRunProxy (e.g., touch sessionLastSeen on each
send/heartbeat or in the stream loop), so cleanup won’t evict cached runner
ports and calls that rely on handlers.DefaultRunnerPort fallback won’t break
non-default runners.
---
Outside diff comments:
In `@components/backend/websocket/agui_store.go`:
- Around line 162-205: loadEvents currently reads and decodes every JSONL event
into an in-memory slice and can silently fail on lines >1MiB; change it to
stream-decoded events instead of materializing them: replace
loadEvents(sessionID string) []map[string]interface{} with a streaming variant
used by HandleAGUIEvents such as loadEvents(sessionID string, emit
func(map[string]interface{}) error) error (or return a receive-only channel),
keep the existing migration logic (MigrateLegacySessionToAGUI) and file/open
retry, then inside the scanner loop unmarshal each line and immediately call
emit(evt) (handling and propagating errors from emit), and explicitly detect
scanner.Err() and bufio.ErrTooLong to log/return a clear error instead of
returning a truncated partial slice.
In `@components/backend/websocket/export.go`:
- Around line 202-227: The current readJSONLFile function plus subsequent
json.MarshalIndent calls (used earlier when writing to c.Writer) accumulate the
whole export in memory and cap per-line size via Scanner.Buffer; instead, remove
readJSONLFile usage and stream the JSONL file directly to c.Writer: open the
file, set response headers, write the '[' then iterate over lines using a reader
that doesn't impose a 1MiB scanner limit (e.g., bufio.Reader.ReadBytes('\n') or
io.Reader with json.Decoder/streaming), for each line validate or wrap as
json.RawMessage and write comma-separated entries directly to c.Writer (flushing
if using a streaming context), then write ']' and handle errors—this ensures you
never json.MarshalIndent the full events slice and avoids Scanner's ErrTooLong
while keeping the export streamed to c.Writer.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 9a3f8d16-70c8-41f0-acc3-373af6ae6499
📒 Files selected for processing (4)
components/backend/handlers/sessions.gocomponents/backend/websocket/agui_proxy.gocomponents/backend/websocket/agui_store.gocomponents/backend/websocket/export.go
| const staleSessionThreshold = 10 * time.Minute | ||
|
|
||
| // staleSessionCleanupInterval is how often the cleanup goroutine runs. | ||
| const staleSessionCleanupInterval = 10 * time.Minute | ||
| const staleSessionCleanupInterval = 2 * time.Minute |
There was a problem hiding this comment.
Don't expire live session caches after 10 minutes.
sessionLastSeen is only refreshed when HandleAGUIEvents and HandleAGUIRunProxy start handling a request (Lines 141 and 266), not while an SSE stream or runner stream is still active. With a 10-minute threshold, cleanup can evict the cached runner port for a session that is still alive, so later /agui/interrupt, /agui/feedback, and similar calls fall back to handlers.DefaultRunnerPort and break non-default runners.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@components/backend/websocket/agui_proxy.go` around lines 76 - 79, The
staleSessionThreshold of 10 minutes causes live sessions to be evicted because
sessionLastSeen is only updated when HandleAGUIEvents and HandleAGUIRunProxy
start handling a request; fix by ensuring active streams keep the session alive:
either raise staleSessionThreshold to a much larger value (e.g., hours) or,
preferably, update sessionLastSeen periodically while SSE/runner streams are
active inside HandleAGUIEvents and HandleAGUIRunProxy (e.g., touch
sessionLastSeen on each send/heartbeat or in the stream loop), so cleanup won’t
evict cached runner ports and calls that rely on handlers.DefaultRunnerPort
fallback won’t break non-default runners.
Summary
bufio.Scannerinstead ofos.ReadFileto avoid loading entire JSONL files into memoryreadJSONLFileinexport.goio.LimitReaderListSessionsRoot Cause
loadEvents()inagui_store.gousedos.ReadFile()to load entire event log files into memory. For long-running sessions, these.jsonlfiles grow to hundreds of MB. EachGET /agui/eventsrequest loaded the full file, causing the backend pod to OOMKill at the 512Mi limit.Test plan
gofmt— cleango vet ./...— cleango build ./...— cleango test ./websocket/...— passinggo test ./handlers/...— passing (1 pre-existing failure unrelated to changes)Fixes: RHOAIENG-52922
Jira: RHOAIENG-52922