Skip to content

fix(backend): snapshot compaction for AGUI events to prevent OOM#914

Open
Gkrumbach07 wants to merge 6 commits intomainfrom
worktree-agui-snapshot-compaction
Open

fix(backend): snapshot compaction for AGUI events to prevent OOM#914
Gkrumbach07 wants to merge 6 commits intomainfrom
worktree-agui-snapshot-compaction

Conversation

@Gkrumbach07
Copy link
Contributor

Summary

  • Backend was OOMKilled (512Mi limit) due to loading 36K+ events into memory per SSE client reconnect
  • Implements AG-UI snapshot compaction: collapses finished event streams into MESSAGES_SNAPSHOT events (36K events → ~3 events)
  • Caches compacted snapshots to disk (agui-events-compacted.jsonl) with atomic writes; subsequent reads serve from cache
  • Removes dead compactStreamingEvents delta compaction code (180 lines, replaced by snapshot compaction)
  • Bumps backend memory limit from 512Mi → 768Mi as safety net

Key changes

  • compactToSnapshots() — assembles TEXT_MESSAGE and TOOL_CALL sequences into Message objects per AG-UI spec
  • loadEventsForReplay() — serves cached snapshots for finished sessions, raw events for active runs
  • Cache invalidation on RUN_STARTED and RUN_ERROR
  • Uses strings.Builder for O(n) delta concatenation (was O(n²) via +=)
  • Reuses existing readJSONLFile helper instead of duplicating

Test plan

  • All existing websocket tests pass (31 tests)
  • New TestCompactToSnapshots — verifies text messages, tool calls, RAW passthrough, metadata preservation
  • New TestLoadEventsForReplay — verifies finished/active session handling, cache write/read, cache invalidation
  • go vet, gofmt, go build all clean
  • Deploy to dev cluster and verify SSE reconnect works for finished and active sessions

🤖 Generated with Claude Code

@coderabbitai
Copy link

coderabbitai bot commented Mar 13, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Swaps streaming-delta compaction for snapshot-based compaction, adds loadEventsForReplay, updates HandleAGUIEvents to use it and unconditionally stream replay events, changes persistEvent to trigger/invalidate compaction, and adds unit/filesystem tests for compaction and replay behavior.

Changes

Cohort / File(s) Summary
Event Proxy Handler
components/backend/websocket/agui_proxy.go
Switched replay source to loadEventsForReplay(sessionName). Removed last-event (RUN_FINISHED/RUN_ERROR) detection/branching and now writes each replay event to the SSE stream in a single loop.
Event Store and Compaction
components/backend/websocket/agui_store.go
Replaced streaming-delta compaction with compactToSnapshots that assembles MESSAGES_SNAPSHOT (concatenates TEXT_MESSAGE_CONTENT and tool-call args). Added loadEventsForReplay(sessionID) and compactFinishedRun(sessionID) (atomic rewrite to compacted file). persistEvent now best-effort removes compacted cache on RunStarted and triggers background compaction on run termination.
Event Store Tests
components/backend/websocket/agui_store_test.go
Added TestCompactToSnapshots and TestLoadEventsForReplay covering message/tool-call snapshot assembly, preservation of RAW/META/STATE events, compaction for finished runs, fallback on missing snapshots, and filesystem isolation.
Module File
go.mod
Updated module dependencies/versions (lines changed).

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Proxy as Proxy (HandleAGUIEvents)
    participant Store as Event Store
    participant FS as Filesystem

    Client->>Proxy: Request session replay
    Proxy->>Store: loadEventsForReplay(sessionID)
    alt Compacted cache exists
        Store->>FS: Read agui-events-compacted.jsonl
        FS-->>Store: Return cached snapshots
        Store-->>Proxy: Return snapshot events
    else Cache missing
        Store->>FS: Read agui-events.jsonl (raw events)
        FS-->>Store: Return raw events
        alt Session is finished
            Store->>Store: compactToSnapshots(raw events)
            Store->>FS: writeCompactedFile (atomic replace)
            Store-->>Proxy: Return compacted snapshots
        else Session is active
            Store-->>Proxy: Return raw events
        end
    end
    Proxy->>Client: Stream each replay event to SSE
Loading
sequenceDiagram
    participant Proxy as Proxy (HandleAGUIEvents)
    participant Store as Event Store
    participant FS as Filesystem

    Proxy->>Store: persistEvent(sessionID, event)
    Store->>FS: Append event to agui-events.jsonl
    alt Event is RunStarted
        Store->>FS: Remove agui-events-compacted.jsonl (best-effort invalidation)
    end
    alt Event ends run (RunFinished/RunError)
        Store->>Store: Trigger background compactFinishedRun
        Store->>FS: writeCompactedFile (atomic replace) when ready
    end
    Store-->>Proxy: Persist acknowledged
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 66.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title directly and specifically describes the main change: implementing snapshot compaction for AGUI events to address OOM issues.
Description check ✅ Passed The description comprehensively relates to the changeset, detailing the OOM problem, snapshot compaction solution, key function changes, testing approach, and deployment status.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch worktree-agui-snapshot-compaction

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/backend/websocket/agui_store_test.go`:
- Around line 521-522: The test currently uses a fixed sleep to wait for the
async cache write which is racy; update the test to synchronize
deterministically: either change writeCompactedFile to expose a synchronization
primitive (return a done channel or accept a *sync.WaitGroup) and wait on that
in the test, or replace the time.Sleep in agui_store_test.go with a polling loop
that checks for the file’s existence (os.Stat) with a short interval and overall
timeout (failing the test if timeout elapses). Remove the time.Sleep(100 *
time.Millisecond) and use the chosen synchronization approach around
writeCompactedFile to avoid flakes.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 13c147d5-0432-4476-9f09-606d669d7fd3

📥 Commits

Reviewing files that changed from the base of the PR and between 4c40817 and 8de8a27.

📒 Files selected for processing (4)
  • components/backend/websocket/agui_proxy.go
  • components/backend/websocket/agui_store.go
  • components/backend/websocket/agui_store_test.go
  • components/manifests/base/backend-deployment.yaml

@ambient-code ambient-code bot deleted a comment from Gkrumbach07 Mar 17, 2026
@ambient-code
Copy link
Contributor

ambient-code bot commented Mar 17, 2026

Review Queue Status

Check Status Detail
CI FAIL CodeQL
Conflicts pass
Reviews warn Awaiting review

Action needed: Fix CI failures

Auto-generated by Review Queue workflow. Updated when PR changes.

The backend was OOMKilled (512Mi limit) when replaying large event
streams for finished sessions. Multiple concurrent SSE clients each
loaded 36K+ events into memory and ran delta compaction, exceeding
the memory limit within ~44 seconds.

This implements AG-UI snapshot compaction per the serialization spec:
finished sessions are collapsed into MESSAGES_SNAPSHOT events (36K
events → ~3 events), cached to disk, and served from cache on
subsequent reads.

Changes:
- Add compactToSnapshots() using AG-UI MESSAGES_SNAPSHOT pattern
- Add disk caching (agui-events-compacted.jsonl) with atomic writes
- Invalidate cache on RUN_STARTED and RUN_ERROR events
- Use strings.Builder for O(n) delta concatenation (was O(n²))
- Reuse existing readJSONLFile helper instead of duplicating
- Remove dead compactStreamingEvents (180 lines, no longer called)
- Bump backend memory limit from 512Mi to 768Mi as safety net

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@Gkrumbach07 Gkrumbach07 force-pushed the worktree-agui-snapshot-compaction branch from 8de8a27 to a73d289 Compare March 20, 2026 13:21
@Gkrumbach07
Copy link
Contributor Author

PR Cleanup Summary

✅ Completed Tasks

  1. Rebased onto latest origin/main

    • Successfully rebased from main (cdf2f6fd)
    • No merge conflicts
    • Force-pushed with --force-with-lease
  2. Code Review

    • Reviewed full diff (844 lines across 4 files)
    • Confirmed only relevant changes are present:
      • components/backend/websocket/agui_proxy.go - Updated to use loadEventsForReplay()
      • components/backend/websocket/agui_store.go - New snapshot compaction logic
      • components/backend/websocket/agui_store_test.go - New tests
      • components/manifests/base/core/backend-deployment.yaml - Memory limit increase
    • ✅ No development/debug markdown files
    • ✅ No .claude/ directory changes
    • ✅ No unrelated formatting changes
    • ✅ No temporary configs or test artifacts
  3. Backend Checks

    • gofmt -l . → Clean (no formatting issues)
    • go vet ./... → Pass
    • golangci-lint run → 0 issues
  4. Operator Checks

    • gofmt -l . → Clean
    • go vet ./... → Pass
    • golangci-lint run → 0 issues
  5. Frontend Build

    • npm run build → Success
  6. Backend Tests

    • All 31 websocket tests pass
    • New tests included:
      • TestCompactToSnapshots (6 sub-tests)
      • TestLoadEventsForReplay (4 sub-tests)

📊 Summary

PR is clean and ready for review. All checks pass, no extraneous changes detected.

- Check error returns from w.Write() and w.WriteByte()
- Check error return from f.Close()
- Explicitly ignore cleanup errors with `_ =` pattern
- Fixes CodeQL warnings about unchecked error returns

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
@Gkrumbach07
Copy link
Contributor Author

CodeQL Fix Applied

Fixed unchecked error returns in writeCompactedFile() that were flagging CodeQL warnings:

  • ✅ Check error returns from w.Write() and w.WriteByte()
  • ✅ Check error return from f.Close()
  • ✅ Explicitly ignore cleanup errors with _ = pattern

All backend checks and tests still pass (31 tests, 0 lint issues).

Commit: 8ffd5b4

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/backend/websocket/agui_store.go`:
- Around line 167-171: The current invalidation only checks event["type"]
(eventType) against EventTypeRunStarted/RunError before removing the compacted
file (compactedPath), which allows RAW/MESSAGES_SNAPSHOT appended by
HandleAGUIRunProxy or HandleAGUIFeedback to make the compacted file stale or
cause permanent fallback; update the invalidation in the code that touches
compactedPath (and the same logic around lines 519-529) to derive freshness from
the latest run boundary or a raw-log version marker instead of just the last
event type: inspect the latest run state (e.g., last RUN_STARTED/RUN_FINISHED
timestamps or a persisted raw-log version counter), compare that to the
compacted file’s recorded run boundary/version, and remove/regenerate
compactedPath when the compacted file is older than the latest run
boundary/version so loadEventsForReplay can rely on compacted data safely;
ensure HandleAGUIRunProxy and HandleAGUIFeedback update the run-boundary/version
metadata when they persist RAW or MESSAGES_SNAPSHOT.
- Around line 531-536: The current async writer launched after
compactToSnapshots (writeCompactedFile(compactedPath, compacted)) allows
concurrent reconnects to each run loadEvents + compactToSnapshots and spawn
duplicate writers; serialize cache creation per session (e.g., use a per-session
lock or a singleflight.Group keyed by sessionID) so only one goroutine performs
loadEvents→compactToSnapshots→writeCompactedFile for a given sessionID, have
other callers wait for that result, and ensure the writer verifies it is still
the latest data before persisting (compare a session generation/version or check
cache existence) to avoid racing stale writes; update call sites that invoke
compactToSnapshots/writeCompactedFile to use the new per-session coordination
helper.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 5adbf1d4-9373-4260-84ae-79963e07504e

📥 Commits

Reviewing files that changed from the base of the PR and between 8de8a27 and a73d289.

📒 Files selected for processing (3)
  • components/backend/websocket/agui_proxy.go
  • components/backend/websocket/agui_store.go
  • components/backend/websocket/agui_store_test.go

Comment on lines +531 to +536
// Finished run — compact to snapshots
compacted := compactToSnapshots(events)
log.Printf("AGUI Events: %d raw → %d snapshot events for %s (finished)", len(events), len(compacted), sessionID)

// Persist compacted file for future reads (best-effort, non-blocking)
go writeCompactedFile(compactedPath, compacted)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Cold-cache reconnects can still fan out the expensive compaction path.

This async write is launched without per-session coordination, so every concurrent reconnect that misses the cache will also execute loadEvents + compactToSnapshots and start another writer. That can recreate the first-reconnect memory spike this PR is trying to remove, and a later invalidation can still lose the race and have this goroutine recreate a stale compacted file. Serialize cache creation per session instead of letting every caller take this path. As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/backend/websocket/agui_store.go` around lines 531 - 536, The
current async writer launched after compactToSnapshots
(writeCompactedFile(compactedPath, compacted)) allows concurrent reconnects to
each run loadEvents + compactToSnapshots and spawn duplicate writers; serialize
cache creation per session (e.g., use a per-session lock or a singleflight.Group
keyed by sessionID) so only one goroutine performs
loadEvents→compactToSnapshots→writeCompactedFile for a given sessionID, have
other callers wait for that result, and ensure the writer verifies it is still
the latest data before persisting (compare a session generation/version or check
cache existence) to avoid racing stale writes; update call sites that invoke
compactToSnapshots/writeCompactedFile to use the new per-session coordination
helper.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
components/backend/websocket/agui_store.go (2)

167-171: ⚠️ Potential issue | 🟠 Major

Still unresolved: cache freshness is derived from the last record.

Lines 168-171 only invalidate on RUN_STARTED/RUN_ERROR, Line 508 serves the compacted file before inspecting the raw log, and Lines 524-528 decide terminality from only the final event. A trailing RAW or STATE_* append after RUN_FINISHED can therefore either keep replaying stale snapshots or permanently force raw replay for a session that is already complete. Persist and compare a raw-log generation or terminal run boundary before serving agui-events-compacted.jsonl.

As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."

Also applies to: 507-510, 519-529

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/backend/websocket/agui_store.go` around lines 167 - 171, The
compacted cache is considered fresh only by deleting it on RUN_STARTED/RUN_ERROR
but serving code (which reads agui-events-compacted.jsonl and uses the last
event to decide terminality) can return stale snapshots if trailing RAW/STATE_*
appends occur; add a persisted generation/terminal marker so the server can
validate the compacted file before serving. Specifically: when creating/writing
compactedPath (agui-events-compacted.jsonl) also write a small companion
metadata entry (e.g., agui-events-compacted.meta) containing the raw-log
generation ID or the terminal-run boundary (sequence number or final-event flag)
and update that metadata inside the same code paths that append raw events;
then, in the code path that serves/reads agui-events-compacted.jsonl (the logic
that inspects the raw log and decides terminality from the final event), compare
the current raw-log generation/terminal marker against the metadata; if they
differ, invalidate/remove compactedPath and regenerate/serve raw instead. Use
the existing symbols eventType, types.EventTypeRunStarted,
types.EventTypeRunError, compactedPath and the code that inspects the final
event to implement this comparison and atomic metadata updates.

531-536: ⚠️ Potential issue | 🟠 Major

Still unresolved: cold-cache reconnects can fan out compaction work.

Lines 531-536 let every cache miss run its own loadEvents/compactToSnapshots and launch another writer. A reconnect burst recreates the expensive path N times, and the slowest writer can still republish stale data after a newer run has already invalidated the cache. Gate compaction and cache writes behind per-session coordination, then recheck freshness immediately before the rename.

As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."

Also applies to: 541-584

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/backend/websocket/agui_store.go` around lines 531 - 536, Multiple
concurrent reconnects cause duplicated expensive work because each cache miss
calls loadEvents → compactToSnapshots and spawns writeCompactedFile; fix by
adding per-session coordination (e.g., a session-scoped mutex or singleflight
keyed by sessionID) around the compact/write path so only one goroutine runs
compactToSnapshots and writeCompactedFile for a given session at a time; before
performing the final rename/replace of compactedPath, recheck the cache/file
freshness (timestamp or version) to avoid overwriting newer results and have
other waiters read that fresh result instead of recomputing.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/backend/websocket/agui_store.go`:
- Around line 513-533: The current path loads the entire raw event slice via
loadEvents into events and then calls compactToSnapshots, which causes full-peak
memory use on first finished-session replay; change the flow to avoid allocating
the full raw array by either (a) replacing loadEvents with a streaming reader
that iterates raw events and performs incremental compaction into compacted
(e.g., a new streamCompactEvents(sessionID) that yields/accumulates snapshots
without keeping all raw events), or (b) emit/build the compacted snapshot file
at run-terminal time so subsequent reads use compacted data only; update usages
around events, last, lastType, and the compactToSnapshots call to use the
streaming/terminal-compact API so the first finished replay never materializes
the entire raw log in memory.
- Around line 535-536: The background goroutine calls
writeCompactedFile(compactedPath, compacted) while the same compacted
[]map[string]interface{} may later be mutated by writeSSEEvent during replay,
causing concurrent map read/write; fix by deep-cloning the slice-of-maps before
spawning the goroutine (introduce and use a helper like cloneEventMaps(events
[]map[string]interface{}) []map[string]interface{} to copy each map and the
slice) and pass the cloned value to writeCompactedFile so the background writer
never shares mutable maps with writeSSEEvent.

---

Duplicate comments:
In `@components/backend/websocket/agui_store.go`:
- Around line 167-171: The compacted cache is considered fresh only by deleting
it on RUN_STARTED/RUN_ERROR but serving code (which reads
agui-events-compacted.jsonl and uses the last event to decide terminality) can
return stale snapshots if trailing RAW/STATE_* appends occur; add a persisted
generation/terminal marker so the server can validate the compacted file before
serving. Specifically: when creating/writing compactedPath
(agui-events-compacted.jsonl) also write a small companion metadata entry (e.g.,
agui-events-compacted.meta) containing the raw-log generation ID or the
terminal-run boundary (sequence number or final-event flag) and update that
metadata inside the same code paths that append raw events; then, in the code
path that serves/reads agui-events-compacted.jsonl (the logic that inspects the
raw log and decides terminality from the final event), compare the current
raw-log generation/terminal marker against the metadata; if they differ,
invalidate/remove compactedPath and regenerate/serve raw instead. Use the
existing symbols eventType, types.EventTypeRunStarted, types.EventTypeRunError,
compactedPath and the code that inspects the final event to implement this
comparison and atomic metadata updates.
- Around line 531-536: Multiple concurrent reconnects cause duplicated expensive
work because each cache miss calls loadEvents → compactToSnapshots and spawns
writeCompactedFile; fix by adding per-session coordination (e.g., a
session-scoped mutex or singleflight keyed by sessionID) around the
compact/write path so only one goroutine runs compactToSnapshots and
writeCompactedFile for a given session at a time; before performing the final
rename/replace of compactedPath, recheck the cache/file freshness (timestamp or
version) to avoid overwriting newer results and have other waiters read that
fresh result instead of recomputing.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 56566a7d-3e7e-4405-b926-0144a739e82d

📥 Commits

Reviewing files that changed from the base of the PR and between a73d289 and 8ffd5b4.

📒 Files selected for processing (1)
  • components/backend/websocket/agui_store.go

Per AG-UI serialization spec, finished runs should only store snapshot
events. This changes the compaction strategy from "compact on read" to
"compact on write immediately after RUN_FINISHED/RUN_ERROR".

### What Changed:

**Before:**
- Stored ALL events to `agui-events.jsonl` (36K+ streaming events)
- On read, created a NEW MESSAGES_SNAPSHOT from streaming events
- Cached to separate `agui-events-compacted.jsonl` file
- Two files per finished session

**After:**
- Store ALL events while run is active (needed for SSE streaming)
- When RUN_FINISHED/RUN_ERROR arrives, immediately replace raw file with snapshots-only
- Trust runner's MESSAGES_SNAPSHOT (emitted in finally block)
- If no MESSAGES_SNAPSHOT found, session is corrupted - keep raw events as fallback
- One file per session (the raw file IS the compacted file after run finishes)

### What We Keep:
- MESSAGES_SNAPSHOT (from runner)
- STATE_SNAPSHOT (from runner when state changes)
- Lifecycle events (RUN_STARTED, RUN_FINISHED, RUN_ERROR, STEP_*)
- RAW events (metadata, feedback)

### What We Delete:
- TEXT_MESSAGE_START/CONTENT/END
- TOOL_CALL_START/ARGS/END
- STATE_DELTA

Reduces storage from ~36K events to ~5-10 events per finished session.

Runner DOES emit MESSAGES_SNAPSHOT (confirmed in ag_ui_claude_sdk/adapter.py:1195 - in finally block)
Runner DOES emit STATE_SNAPSHOT (at start if state provided, on state updates)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
@Gkrumbach07
Copy link
Contributor Author

Major Refactor: Trust Runner's MESSAGES_SNAPSHOT

Updated compaction strategy to align with AG-UI serialization spec after confirming:

  • ✅ Runner DOES emit MESSAGES_SNAPSHOT (in finally block - always present)
  • ✅ Runner DOES emit STATE_SNAPSHOT (when state provided/changes)

New Approach: Compact on Write

Before (compact on read):

  • Stored ALL 36K+ streaming events to agui-events.jsonl
  • On SSE reconnect, created NEW MESSAGES_SNAPSHOT from streaming events
  • Cached to separate agui-events-compacted.jsonl file
  • Two files per finished session

After (compact on write):

  • Store ALL events while run is active (needed for real-time SSE)
  • When RUN_FINISHED/RUN_ERROR arrives, immediately replace raw file with snapshots-only
  • Trust runner's MESSAGES_SNAPSHOT (canonical source)
  • If no MESSAGES_SNAPSHOT found → session corrupted, keep raw events as fallback
  • One file per session (raw file IS compacted after run finishes)

Storage Savings

Kept events:

  • MESSAGES_SNAPSHOT (from runner)
  • STATE_SNAPSHOT (from runner)
  • Lifecycle events (RUN_STARTED, RUN_FINISHED, RUN_ERROR, STEP_*)
  • RAW events (metadata, feedback)

Deleted events:

  • TEXT_MESSAGE_START/CONTENT/END
  • TOOL_CALL_START/ARGS/END
  • STATE_DELTA

Result: ~36K events → ~5-10 events per finished session

Implementation

  • compactFinishedRun() - filters events to snapshots-only, replaces file atomically
  • Triggered by persistEvent() when RUN_FINISHED/RUN_ERROR arrives
  • Non-blocking (goroutine) to avoid blocking event stream
  • Corruption-safe: if no MESSAGES_SNAPSHOT, keeps raw events

Tests Updated

  • ✅ Test compaction on write (not read)
  • ✅ Test corruption fallback (missing MESSAGES_SNAPSHOT)
  • ✅ Test STATE_SNAPSHOT preservation
  • ✅ All 31 websocket tests pass

Commit: ec7da93

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/backend/websocket/agui_store_test.go`:
- Around line 473-477: The test calls compactFinishedRun(sessionID)
synchronously, so the subsequent time.Sleep(100 * time.Millisecond) is redundant
and only slows tests; remove the sleep immediately following
compactFinishedRun(sessionID) and likewise remove the other duplicate sleeps
that follow other compactFinishedRun(...) calls in the same test (the three
occurrences present in agui_store_test.go) to eliminate the unnecessary delays.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: d67b861d-26ba-4f55-866e-cc642172875f

📥 Commits

Reviewing files that changed from the base of the PR and between 8ffd5b4 and ec7da93.

📒 Files selected for processing (2)
  • components/backend/websocket/agui_store.go
  • components/backend/websocket/agui_store_test.go

Comment on lines +473 to +477
compactFinishedRun(sessionID)

// Wait for async compaction
time.Sleep(100 * time.Millisecond)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Unnecessary time.Sleep after synchronous compactFinishedRun call.

compactFinishedRun(sessionID) is called directly (not via goroutine), so it completes synchronously before time.Sleep. The sleep adds ~300ms total delay across the three test cases without providing any synchronization benefit.

♻️ Proposed removal of unnecessary sleeps
 		// Manually trigger compaction (in production, persistEvent does this on RUN_FINISHED)
 		compactFinishedRun(sessionID)
-
-		// Wait for async compaction
-		time.Sleep(100 * time.Millisecond)

 		result := loadEventsForReplay(sessionID)

Apply similar removal at lines 537-538 and 576-577.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
compactFinishedRun(sessionID)
// Wait for async compaction
time.Sleep(100 * time.Millisecond)
compactFinishedRun(sessionID)
result := loadEventsForReplay(sessionID)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/backend/websocket/agui_store_test.go` around lines 473 - 477, The
test calls compactFinishedRun(sessionID) synchronously, so the subsequent
time.Sleep(100 * time.Millisecond) is redundant and only slows tests; remove the
sleep immediately following compactFinishedRun(sessionID) and likewise remove
the other duplicate sleeps that follow other compactFinishedRun(...) calls in
the same test (the three occurrences present in agui_store_test.go) to eliminate
the unnecessary delays.

MESSAGES_SNAPSHOT only contains messages (user, assistant, tool).
Custom/extension events are NOT included in MESSAGES_SNAPSHOT, so we
must persist them separately to preserve:

- META events (user feedback: thumbs up/down)
- CUSTOM events (platform extensions)
- RAW events (metadata like hidden messages)
- ACTIVITY_SNAPSHOT (frontend durable UI state)

Updated compaction to preserve all extension events. ACTIVITY_DELTA
is still discarded as it's superseded by ACTIVITY_SNAPSHOT.

Added test to verify META/CUSTOM/RAW preservation.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
@Gkrumbach07
Copy link
Contributor Author

Fix: Preserve META, CUSTOM, and ACTIVITY_SNAPSHOT Events

Good catch! MESSAGES_SNAPSHOT only contains messages (user, assistant, tool). It does NOT include custom/extension events.

What Was Missing:

Before:

  • Only preserved RAW events
  • Lost META (user feedback: 👍/👎)
  • Lost CUSTOM (platform extensions)
  • Lost ACTIVITY_SNAPSHOT (frontend durable UI)

After:
Now preserving ALL extension events:

  • RAW - metadata (e.g., hidden message markers)
  • META - user feedback (thumbs up/down)
  • CUSTOM - platform-specific extensions
  • ACTIVITY_SNAPSHOT - frontend durable UI state

What We Still Discard:

Streaming events that are superseded by snapshots:

  • TEXT_MESSAGE_START/CONTENT/END → superseded by MESSAGES_SNAPSHOT
  • TOOL_CALL_START/ARGS/END → superseded by MESSAGES_SNAPSHOT
  • STATE_DELTA → superseded by STATE_SNAPSHOT
  • ACTIVITY_DELTA → superseded by ACTIVITY_SNAPSHOT

Test Added:

New test META_and_CUSTOM_events_are_preserved_during_compaction verifies:

  • META event (user feedback) is preserved
  • CUSTOM event (platform extension) is preserved
  • RAW event (metadata) is preserved
  • Streaming events are removed

All 32 websocket tests pass ✅

Commit: 2704ef7

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/backend/websocket/agui_store.go`:
- Around line 166-175: compactFinishedRun performs read→filter→os.Rename on
agui-events.jsonl without holding the per-session lock, risking races and lost
events; update compactFinishedRun to acquire the same per-session write lock
from getWriteMutex(sessionID) for the entire readJSONLFile→filter→os.Rename
sequence (or alternatively implement a version-check: read the file, record
highest event sequence/run id, filter, then before os.Rename re-check current
highest sequence and abort if it advanced), ensure the function uses sessionID
and compactedPath consistently and that any goroutine-launch sites (where
compactFinishedRun is called) rely on this serialized behavior so reconnects and
run-starts cannot race with compaction.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 216c8054-7ede-43b4-8c30-9f4f55e7745c

📥 Commits

Reviewing files that changed from the base of the PR and between ec7da93 and 2704ef7.

📒 Files selected for processing (2)
  • components/backend/websocket/agui_store.go
  • components/backend/websocket/agui_store_test.go

Comment on lines +166 to +175
// Compact finished runs immediately to snapshot-only events
eventType, _ := event["type"].(string)
switch eventType {
case types.EventTypeRunFinished, types.EventTypeRunError:
// Non-blocking compaction to replace raw events with snapshots
go compactFinishedRun(sessionID)
case types.EventTypeRunStarted:
// New run invalidates any cached compacted file from previous run
compactedPath := dir + "/agui-events-compacted.jsonl"
_ = os.Remove(compactedPath)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Compaction is rewriting the live session log outside the session lock.

compactFinishedRun reads and renames agui-events.jsonl without getWriteMutex(sessionID). Any append that lands between readJSONLFile and os.Rename can be lost, and if a new run starts in that window the compactor will filter away that run’s raw TEXT_MESSAGE_*/TOOL_CALL_* events before replacing the file. Because the compaction is also launched with go, reconnects in that window still hit the full raw log. Please serialize the read→filter→rename sequence with the same per-session lock, or version it by run boundary and abort when newer events exist before replacing the file.

Also applies to: 512-526, 546-640

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/backend/websocket/agui_store.go` around lines 166 - 175,
compactFinishedRun performs read→filter→os.Rename on agui-events.jsonl without
holding the per-session lock, risking races and lost events; update
compactFinishedRun to acquire the same per-session write lock from
getWriteMutex(sessionID) for the entire readJSONLFile→filter→os.Rename sequence
(or alternatively implement a version-check: read the file, record highest event
sequence/run id, filter, then before os.Rename re-check current highest sequence
and abort if it advanced), ensure the function uses sessionID and compactedPath
consistently and that any goroutine-launch sites (where compactFinishedRun is
called) rely on this serialized behavior so reconnects and run-starts cannot
race with compaction.

Gkrumbach07 and others added 2 commits March 20, 2026 14:12
- Add semaphore to limit max 10 concurrent compactions (prevents unbounded goroutine spawning on high-volume RUN_FINISHED events)
- Add error logging to previously silent error paths in compactFinishedRun for operational visibility

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
- Add isValidSessionName() validation to all functions that accept sessionID
- Use filepath.Join() instead of string concatenation for path construction
- Resolves 4 high-severity CodeQL alerts for uncontrolled data in path expressions

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant