diff --git a/components/backend/handlers/sessions.go b/components/backend/handlers/sessions.go index 29bf7f775..92484f571 100644 --- a/components/backend/handlers/sessions.go +++ b/components/backend/handlers/sessions.go @@ -34,6 +34,9 @@ import ( "k8s.io/client-go/kubernetes" ) +// k8sListPageSize bounds per-call memory when listing CRs from the API server. +const k8sListPageSize = 500 + // Package-level variables for session handlers (set from main package) var ( GetAgenticSessionV1Alpha1Resource func() schema.GroupVersionResource @@ -380,21 +383,34 @@ func ListSessions(c *gin.Context) { } types.NormalizePaginationParams(¶ms) - // Build list options with pagination - // Note: Kubernetes List with Limit returns a continue token for server-side pagination - // We use offset-based pagination on top of fetching all items for search/sort flexibility + // Build list options with K8s-level pagination to avoid loading all CRs into memory. + // We still need all items for search/sort, but fetching in pages bounds per-call memory. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - list, err := k8sDyn.Resource(gvr).Namespace(project).List(ctx, v1.ListOptions{}) + listOpts := v1.ListOptions{Limit: k8sListPageSize} + list, err := k8sDyn.Resource(gvr).Namespace(project).List(ctx, listOpts) if err != nil { log.Printf("Failed to list agentic sessions in project %s: %v", project, err) c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to list agentic sessions"}) return } + // Fetch remaining pages if the list was truncated by the Limit. + allItems := list.Items + for list.GetContinue() != "" { + listOpts.Continue = list.GetContinue() + list, err = k8sDyn.Resource(gvr).Namespace(project).List(ctx, listOpts) + if err != nil { + log.Printf("Failed to list agentic sessions (continue) in project %s: %v", project, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to list agentic sessions"}) + return + } + allItems = append(allItems, list.Items...) + } + var sessions []types.AgenticSession - for _, item := range list.Items { + for _, item := range allItems { meta, _, err := unstructured.NestedMap(item.Object, "metadata") if err != nil { log.Printf("ListSessions: failed to read metadata for %s/%s: %v", project, item.GetName(), err) diff --git a/components/backend/websocket/agui_proxy.go b/components/backend/websocket/agui_proxy.go index 18a8e211a..862c2ba5e 100644 --- a/components/backend/websocket/agui_proxy.go +++ b/components/backend/websocket/agui_proxy.go @@ -41,6 +41,9 @@ const ( // activityUpdateTimeout bounds how long a single activity status update can take. activityUpdateTimeout = 10 * time.Second + + // maxErrorBodyBytes caps how much of an error response body we read into memory. + maxErrorBodyBytes = 1024 ) // activityUpdateSem limits concurrent goroutines spawned by updateLastActivityTime. @@ -321,7 +324,7 @@ func proxyRunnerStream(runnerURL string, bodyBytes []byte, sessionName, runID, t defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) + body, _ := io.ReadAll(io.LimitReader(resp.Body, maxErrorBodyBytes)) log.Printf("AGUI Proxy: runner returned %d: %s", resp.StatusCode, string(body)) publishAndPersistErrorEvents(sessionName, runID, threadID, fmt.Sprintf("Runner error: HTTP %d", resp.StatusCode)) return @@ -484,7 +487,7 @@ func HandleAGUIInterrupt(c *gin.Context) { defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) + body, _ := io.ReadAll(io.LimitReader(resp.Body, maxErrorBodyBytes)) c.JSON(resp.StatusCode, gin.H{"error": string(body)}) return } @@ -549,7 +552,7 @@ func HandleAGUIFeedback(c *gin.Context) { defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) + body, _ := io.ReadAll(io.LimitReader(resp.Body, maxErrorBodyBytes)) log.Printf("AGUI Feedback: runner returned %d for %s: %s", resp.StatusCode, sessionName, string(body)) c.JSON(resp.StatusCode, gin.H{"error": "Runner rejected feedback", "status": "failed"}) return diff --git a/components/backend/websocket/agui_store.go b/components/backend/websocket/agui_store.go index a6ae9deec..548289823 100644 --- a/components/backend/websocket/agui_store.go +++ b/components/backend/websocket/agui_store.go @@ -11,6 +11,7 @@ package websocket import ( "ambient-code-backend/types" + "bufio" "encoding/json" "fmt" "log" @@ -53,6 +54,12 @@ func evictStaleWriteMutexes() { // Set from the STATE_BASE_DIR env var (default "/workspace") at startup. var StateBaseDir string +const ( + // Scanner buffer sizes for reading JSONL files + scannerInitialBufferSize = 64 * 1024 // 64KB initial buffer + scannerMaxLineSize = 1024 * 1024 // 1MB max line size +) + // ─── Live event pipe (multi-client broadcast) ─────────────────────── // The run handler pipes raw SSE lines to ALL connect handlers tailing // the same session. Zero latency — same as the direct run() path. @@ -157,13 +164,14 @@ func persistEvent(sessionID string, event map[string]interface{}) { // ─── Read path ─────────────────────────────────────────────────────── -// loadEvents reads all AG-UI events for a session from the JSONL log. +// loadEvents reads all AG-UI events for a session from the JSONL log +// using a streaming scanner to avoid loading the entire file into memory. // Automatically triggers legacy migration if the log doesn't exist but // a pre-AG-UI messages.jsonl file does. func loadEvents(sessionID string) []map[string]interface{} { path := fmt.Sprintf("%s/sessions/%s/agui-events.jsonl", StateBaseDir, sessionID) - data, err := os.ReadFile(path) + f, err := os.Open(path) if err != nil { if os.IsNotExist(err) { // Attempt legacy migration (messages.jsonl → agui-events.jsonl) @@ -171,7 +179,7 @@ func loadEvents(sessionID string) []map[string]interface{} { log.Printf("AGUI Store: legacy migration failed for %s: %v", sessionID, mErr) } // Retry after migration - data, err = os.ReadFile(path) + f, err = os.Open(path) if err != nil { return nil } @@ -180,9 +188,14 @@ func loadEvents(sessionID string) []map[string]interface{} { return nil } } + defer f.Close() events := make([]map[string]interface{}, 0, 64) - for _, line := range splitLines(data) { + scanner := bufio.NewScanner(f) + // Allow lines up to 1MB (default 64KB may truncate large tool outputs) + scanner.Buffer(make([]byte, 0, scannerInitialBufferSize), scannerMaxLineSize) + for scanner.Scan() { + line := scanner.Bytes() if len(line) == 0 { continue } @@ -191,6 +204,9 @@ func loadEvents(sessionID string) []map[string]interface{} { events = append(events, evt) } } + if err := scanner.Err(); err != nil { + log.Printf("AGUI Store: error scanning event log for %s: %v", sessionID, err) + } return events } diff --git a/components/backend/websocket/export.go b/components/backend/websocket/export.go index 1651359db..478670ee0 100644 --- a/components/backend/websocket/export.go +++ b/components/backend/websocket/export.go @@ -2,6 +2,7 @@ package websocket import ( + "bufio" "context" "encoding/json" "fmt" @@ -199,15 +200,18 @@ func isValidSessionName(name string) bool { // readJSONLFile reads a JSONL file and returns parsed array of objects func readJSONLFile(path string) ([]map[string]interface{}, error) { - data, err := os.ReadFile(path) + f, err := os.Open(path) if err != nil { return nil, err } + defer f.Close() var events []map[string]interface{} - lines := splitLines(data) + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 0, scannerInitialBufferSize), scannerMaxLineSize) - for _, line := range lines { + for scanner.Scan() { + line := scanner.Bytes() if len(line) == 0 { continue } @@ -220,5 +224,5 @@ func readJSONLFile(path string) ([]map[string]interface{}, error) { events = append(events, event) } - return events, nil + return events, scanner.Err() }