Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions components/backend/handlers/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ import (
"k8s.io/client-go/kubernetes"
)

// k8sListPageSize bounds per-call memory when listing CRs from the API server.
const k8sListPageSize = 500

// Package-level variables for session handlers (set from main package)
var (
GetAgenticSessionV1Alpha1Resource func() schema.GroupVersionResource
Expand Down Expand Up @@ -380,21 +383,34 @@ func ListSessions(c *gin.Context) {
}
types.NormalizePaginationParams(&params)

// Build list options with pagination
// Note: Kubernetes List with Limit returns a continue token for server-side pagination
// We use offset-based pagination on top of fetching all items for search/sort flexibility
// Build list options with K8s-level pagination to avoid loading all CRs into memory.
// We still need all items for search/sort, but fetching in pages bounds per-call memory.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

list, err := k8sDyn.Resource(gvr).Namespace(project).List(ctx, v1.ListOptions{})
listOpts := v1.ListOptions{Limit: k8sListPageSize}
list, err := k8sDyn.Resource(gvr).Namespace(project).List(ctx, listOpts)
if err != nil {
log.Printf("Failed to list agentic sessions in project %s: %v", project, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to list agentic sessions"})
return
}

// Fetch remaining pages if the list was truncated by the Limit.
allItems := list.Items
for list.GetContinue() != "" {
listOpts.Continue = list.GetContinue()
list, err = k8sDyn.Resource(gvr).Namespace(project).List(ctx, listOpts)
if err != nil {
log.Printf("Failed to list agentic sessions (continue) in project %s: %v", project, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to list agentic sessions"})
return
}
allItems = append(allItems, list.Items...)
}
Comment on lines +386 to +410
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

This still materializes the full session set before paginating.

allItems accumulates every page and the next loop copies them again into sessions, so peak memory is still O(total sessions). In a large namespace this endpoint can still hit the same memory ceiling despite the K8s Limit. At minimum, build sessions page-by-page instead of buffering allItems; longer term, this API needs server-side or cursor-based pagination if the request is supposed to stay bounded.

As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/backend/handlers/sessions.go` around lines 386 - 410, The handler
currently accumulates every page into allItems then copies into sessions,
causing O(total sessions) peak memory; change the paging loop in the sessions
listing logic (where list, listOpts, allItems are used) to append each page's
list.Items directly into the final sessions slice as you fetch them (handle the
initial list.Items then for loop on list.GetContinue() appending list.Items each
iteration) instead of buffering allItems, and remove the intermediate allItems
copy so memory grows only as sessions are built per-page; keep the same error
handling around k8sDyn.Resource(gvr).Namespace(project).List calls and reuse the
existing ctx/cancel.


var sessions []types.AgenticSession
for _, item := range list.Items {
for _, item := range allItems {
meta, _, err := unstructured.NestedMap(item.Object, "metadata")
if err != nil {
log.Printf("ListSessions: failed to read metadata for %s/%s: %v", project, item.GetName(), err)
Expand Down
9 changes: 6 additions & 3 deletions components/backend/websocket/agui_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ const (

// activityUpdateTimeout bounds how long a single activity status update can take.
activityUpdateTimeout = 10 * time.Second

// maxErrorBodyBytes caps how much of an error response body we read into memory.
maxErrorBodyBytes = 1024
)

// activityUpdateSem limits concurrent goroutines spawned by updateLastActivityTime.
Expand Down Expand Up @@ -321,7 +324,7 @@ func proxyRunnerStream(runnerURL string, bodyBytes []byte, sessionName, runID, t
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
body, _ := io.ReadAll(io.LimitReader(resp.Body, maxErrorBodyBytes))
log.Printf("AGUI Proxy: runner returned %d: %s", resp.StatusCode, string(body))
publishAndPersistErrorEvents(sessionName, runID, threadID, fmt.Sprintf("Runner error: HTTP %d", resp.StatusCode))
return
Expand Down Expand Up @@ -484,7 +487,7 @@ func HandleAGUIInterrupt(c *gin.Context) {
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
body, _ := io.ReadAll(io.LimitReader(resp.Body, maxErrorBodyBytes))
c.JSON(resp.StatusCode, gin.H{"error": string(body)})
return
}
Expand Down Expand Up @@ -549,7 +552,7 @@ func HandleAGUIFeedback(c *gin.Context) {
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
body, _ := io.ReadAll(io.LimitReader(resp.Body, maxErrorBodyBytes))
log.Printf("AGUI Feedback: runner returned %d for %s: %s", resp.StatusCode, sessionName, string(body))
c.JSON(resp.StatusCode, gin.H{"error": "Runner rejected feedback", "status": "failed"})
return
Expand Down
24 changes: 20 additions & 4 deletions components/backend/websocket/agui_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package websocket

import (
"ambient-code-backend/types"
"bufio"
"encoding/json"
Comment on lines +14 to 15
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Run gofmt on this file before merge.

The lint job is already red for websocket/agui_store.go, so this PR will not pass CI as-is.

As per coding guidelines, "Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."

Also applies to: 57-61

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/backend/websocket/agui_store.go` around lines 14 - 15, Run gofmt
(and optionally goimports) on the agui_store.go source to fix formatting issues
in the import block and the surrounding code; specifically reformat the import
block that currently lists "bufio" and "encoding/json" and any code around the
region that triggered the lint (the area noted in the review) so the file passes
gofmt checks and CI.

"fmt"
"log"
Expand Down Expand Up @@ -53,6 +54,12 @@ func evictStaleWriteMutexes() {
// Set from the STATE_BASE_DIR env var (default "/workspace") at startup.
var StateBaseDir string

const (
// Scanner buffer sizes for reading JSONL files
scannerInitialBufferSize = 64 * 1024 // 64KB initial buffer
scannerMaxLineSize = 1024 * 1024 // 1MB max line size
)
Comment on lines +57 to +61
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
rg -n -C2 'scannerMaxLineSize|bufio.NewScanner|scanner.Buffer|scanner.Err|loadEvents\(' \
  components/backend/websocket/agui_store.go \
  components/backend/websocket/agui_proxy.go

Repository: ambient-code/platform

Length of output: 2700


🏁 Script executed:

# Get the complete loadEvents function implementation
sed -n '171,210p' components/backend/websocket/agui_store.go

Repository: ambient-code/platform

Length of output: 1303


🏁 Script executed:

# Check the context around the loadEvents call in agui_proxy.go
sed -n '145,170p' components/backend/websocket/agui_proxy.go

Repository: ambient-code/platform

Length of output: 1000


🏁 Script executed:

# Look for any downstream validation or error checking on the returned events
rg -n -A10 'events := loadEvents' components/backend/websocket/agui_proxy.go

Repository: ambient-code/platform

Length of output: 383


The scanner's 1MB line limit can silently truncate event replay without signaling failure.

When any JSONL line exceeds 1MB, Scanner.Scan() stops and scanner.Err() is triggered. However, loadEvents() only logs this error and returns the partial events slice already parsed. The caller in agui_proxy.go (line 156) has no way to detect incomplete parsing and treats the truncated slice as the complete replay history, causing clients to receive corrupted session state. Either switch to a reader-based loop with proper error propagation, or add an error return type to loadEvents() and fail the request visibly on scan errors.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/backend/websocket/agui_store.go` around lines 57 - 61, The
scanner's 1MB max line size (scannerMaxLineSize) can cause Scanner.Scan() to
stop and produce scanner.Err(), but loadEvents() currently swallows that and
returns a partial events slice; change loadEvents to return (events []Event, err
error) and, when scanner.Err() != nil, return the error instead of the partial
slice; update the caller in agui_proxy.go (the code at the call site around line
156) to handle the error and fail the request (or propagate it) instead of using
truncated events; keep the scanner constants but ensure error propagation so
oversized lines are detected and handled.


// ─── Live event pipe (multi-client broadcast) ───────────────────────
// The run handler pipes raw SSE lines to ALL connect handlers tailing
// the same session. Zero latency — same as the direct run() path.
Expand Down Expand Up @@ -157,21 +164,22 @@ func persistEvent(sessionID string, event map[string]interface{}) {

// ─── Read path ───────────────────────────────────────────────────────

// loadEvents reads all AG-UI events for a session from the JSONL log.
// loadEvents reads all AG-UI events for a session from the JSONL log
// using a streaming scanner to avoid loading the entire file into memory.
// Automatically triggers legacy migration if the log doesn't exist but
// a pre-AG-UI messages.jsonl file does.
func loadEvents(sessionID string) []map[string]interface{} {
path := fmt.Sprintf("%s/sessions/%s/agui-events.jsonl", StateBaseDir, sessionID)

data, err := os.ReadFile(path)
f, err := os.Open(path)
if err != nil {
if os.IsNotExist(err) {
// Attempt legacy migration (messages.jsonl → agui-events.jsonl)
if mErr := MigrateLegacySessionToAGUI(sessionID); mErr != nil {
log.Printf("AGUI Store: legacy migration failed for %s: %v", sessionID, mErr)
}
// Retry after migration
data, err = os.ReadFile(path)
f, err = os.Open(path)
if err != nil {
return nil
}
Expand All @@ -180,9 +188,14 @@ func loadEvents(sessionID string) []map[string]interface{} {
return nil
}
}
defer f.Close()

events := make([]map[string]interface{}, 0, 64)
for _, line := range splitLines(data) {
scanner := bufio.NewScanner(f)
// Allow lines up to 1MB (default 64KB may truncate large tool outputs)
scanner.Buffer(make([]byte, 0, scannerInitialBufferSize), scannerMaxLineSize)
for scanner.Scan() {
line := scanner.Bytes()
if len(line) == 0 {
continue
}
Expand All @@ -191,6 +204,9 @@ func loadEvents(sessionID string) []map[string]interface{} {
events = append(events, evt)
}
}
if err := scanner.Err(); err != nil {
log.Printf("AGUI Store: error scanning event log for %s: %v", sessionID, err)
}
return events
}

Expand Down
12 changes: 8 additions & 4 deletions components/backend/websocket/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package websocket

import (
"bufio"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -199,15 +200,18 @@ func isValidSessionName(name string) bool {

// readJSONLFile reads a JSONL file and returns parsed array of objects
func readJSONLFile(path string) ([]map[string]interface{}, error) {
data, err := os.ReadFile(path)
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()

var events []map[string]interface{}
lines := splitLines(data)
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 0, scannerInitialBufferSize), scannerMaxLineSize)

for _, line := range lines {
for scanner.Scan() {
line := scanner.Bytes()
if len(line) == 0 {
continue
}
Expand All @@ -220,5 +224,5 @@ func readJSONLFile(path string) ([]map[string]interface{}, error) {
events = append(events, event)
}

return events, nil
return events, scanner.Err()
}
Loading