diff --git a/components/backend/types/agui.go b/components/backend/types/agui.go index 24b7fda82..b96b8fc3a 100644 --- a/components/backend/types/agui.go +++ b/components/backend/types/agui.go @@ -47,6 +47,14 @@ const ( EventTypeToolCallArgs = "TOOL_CALL_ARGS" EventTypeToolCallEnd = "TOOL_CALL_END" + // Reasoning events (streaming) + // See: https://docs.ag-ui.com/concepts/events#reasoning-events + EventTypeReasoningStart = "REASONING_START" + EventTypeReasoningEnd = "REASONING_END" + EventTypeReasoningMessageStart = "REASONING_MESSAGE_START" + EventTypeReasoningMessageContent = "REASONING_MESSAGE_CONTENT" + EventTypeReasoningMessageEnd = "REASONING_MESSAGE_END" + // State management events EventTypeStateSnapshot = "STATE_SNAPSHOT" EventTypeStateDelta = "STATE_DELTA" @@ -84,6 +92,7 @@ const ( RoleSystem = "system" RoleTool = "tool" RoleDeveloper = "developer" + RoleReasoning = "reasoning" RoleActivity = "activity" ) diff --git a/components/backend/websocket/agui_proxy.go b/components/backend/websocket/agui_proxy.go index ecb605463..b7c58844b 100644 --- a/components/backend/websocket/agui_proxy.go +++ b/components/backend/websocket/agui_proxy.go @@ -153,30 +153,15 @@ func HandleAGUIEvents(c *gin.Context) { liveCh, cleanup := subscribeLive(sessionName) defer cleanup() - events := loadEvents(sessionName) + // loadEventsForReplay handles finished vs active runs: + // - Finished: returns snapshot-compacted events (MESSAGES_SNAPSHOT), + // cached to disk for future reads. + // - Active: returns raw events to preserve streaming structure. + events := loadEventsForReplay(sessionName) if len(events) > 0 { - // Check if the last run is finished. - runFinished := false - if last := events[len(events)-1]; last != nil { - if t, _ := last["type"].(string); t == types.EventTypeRunFinished { - runFinished = true - } - } - - if runFinished { - // Finished runs get compacted replay (fast, small). - compacted := compactStreamingEvents(events) - log.Printf("AGUI Events: %d raw → %d compacted events for %s (finished)", len(events), len(compacted), sessionName) - for _, evt := range compacted { - writeSSEEvent(c.Writer, evt) - } - } else { - // Active run — send raw events to preserve streaming structure. - log.Printf("AGUI Events: replaying %d raw events for %s (running)", len(events), sessionName) - for _, evt := range events { - writeSSEEvent(c.Writer, evt) - } + for _, evt := range events { + writeSSEEvent(c.Writer, evt) } c.Writer.Flush() } diff --git a/components/backend/websocket/agui_store.go b/components/backend/websocket/agui_store.go index 760500d4a..fa22db120 100644 --- a/components/backend/websocket/agui_store.go +++ b/components/backend/websocket/agui_store.go @@ -18,6 +18,7 @@ import ( "log" "net/http" "os" + "path/filepath" "sync" "sync/atomic" "time" @@ -29,6 +30,11 @@ import ( const writeMutexEvictAge = 30 * time.Minute +// ─── Compaction rate limiting ──────────────────────────────────────── +// compactionSem limits concurrent compaction goroutines to prevent unbounded +// goroutine spawning on high-volume RUN_FINISHED/RUN_ERROR events. +var compactionSem = make(chan struct{}, 10) // max 10 concurrent compactions + func init() { go func() { ticker := time.NewTicker(10 * time.Minute) @@ -112,6 +118,28 @@ func subscribeLive(sessionName string) (<-chan string, func()) { } } +// ─── Path helpers ──────────────────────────────────────────────────── + +// sessionEventsPath validates the sessionID and returns the path to the +// session's JSONL event log. Returns ("", false) if the ID is invalid. +func sessionEventsPath(sessionID string) (string, bool) { + if !isValidSessionName(sessionID) { + return "", false + } + baseDir := filepath.Clean(StateBaseDir) + return filepath.Join(baseDir, "sessions", sessionID, "agui-events.jsonl"), true +} + +// sessionDirPath validates the sessionID and returns the session directory. +// Returns ("", false) if the ID is invalid. +func sessionDirPath(sessionID string) (string, bool) { + if !isValidSessionName(sessionID) { + return "", false + } + baseDir := filepath.Clean(StateBaseDir) + return filepath.Join(baseDir, "sessions", sessionID), true +} + // ─── Write path ────────────────────────────────────────────────────── // writeMutexEntry wraps a per-session mutex with a last-used timestamp @@ -137,8 +165,12 @@ func getWriteMutex(sessionID string) *sync.Mutex { // persistEvent appends a single AG-UI event to the session's JSONL log. // Writes are serialised per-session via a mutex to prevent interleaving. func persistEvent(sessionID string, event map[string]interface{}) { - dir := fmt.Sprintf("%s/sessions/%s", StateBaseDir, sessionID) - path := dir + "/agui-events.jsonl" + dir, ok := sessionDirPath(sessionID) + if !ok { + log.Printf("AGUI Store: persist rejected - invalid session ID: %s", sessionID) + return + } + path := filepath.Join(dir, "agui-events.jsonl") _ = ensureDir(dir) data, err := json.Marshal(event) @@ -161,18 +193,38 @@ func persistEvent(sessionID string, event map[string]interface{}) { if _, err := f.Write(append(data, '\n')); err != nil { log.Printf("AGUI Store: failed to write event: %v", err) } + + // Compact finished runs immediately to snapshot-only events + eventType, _ := event["type"].(string) + switch eventType { + case types.EventTypeRunFinished, types.EventTypeRunError: + // Non-blocking compaction: skip if semaphore is full. + // Uncompacted sessions still serve correctly (raw events). + select { + case compactionSem <- struct{}{}: + go func() { + defer func() { <-compactionSem }() + compactFinishedRun(sessionID) + }() + default: + log.Printf("AGUI Store: compaction skipped for %s (too many in-flight)", sessionID) + } + } } // ─── Read path ─────────────────────────────────────────────────────── -// loadEvents reads all AG-UI events for a session from the JSONL log -// using a streaming scanner to avoid loading the entire file into memory. +// loadEvents reads all AG-UI events for a session from the JSONL log. // Automatically triggers legacy migration if the log doesn't exist but // a pre-AG-UI messages.jsonl file does. func loadEvents(sessionID string) []map[string]interface{} { - path := fmt.Sprintf("%s/sessions/%s/agui-events.jsonl", StateBaseDir, sessionID) + path, ok := sessionEventsPath(sessionID) + if !ok { + log.Printf("AGUI Store: load rejected - invalid session ID: %s", sessionID) + return nil + } - f, err := os.Open(path) + events, err := readJSONLFile(path) if err != nil { if os.IsNotExist(err) { // Attempt legacy migration (messages.jsonl → agui-events.jsonl) @@ -180,7 +232,7 @@ func loadEvents(sessionID string) []map[string]interface{} { log.Printf("AGUI Store: legacy migration failed for %s: %v", sessionID, mErr) } // Retry after migration - f, err = os.Open(path) + events, err = readJSONLFile(path) if err != nil { return nil } @@ -189,25 +241,6 @@ func loadEvents(sessionID string) []map[string]interface{} { return nil } } - defer f.Close() - - events := make([]map[string]interface{}, 0, 64) - scanner := bufio.NewScanner(f) - // Allow lines up to 1MB (default 64KB may truncate large tool outputs) - scanner.Buffer(make([]byte, 0, scannerInitialBufferSize), scannerMaxLineSize) - for scanner.Scan() { - line := scanner.Bytes() - if len(line) == 0 { - continue - } - var evt map[string]interface{} - if err := json.Unmarshal(line, &evt); err == nil { - events = append(events, evt) - } - } - if err := scanner.Err(); err != nil { - log.Printf("AGUI Store: error scanning event log for %s: %v", sessionID, err) - } return events } @@ -216,7 +249,10 @@ func loadEvents(sessionID string) []map[string]interface{} { // // Returns "" if the status cannot be determined (no events, file missing, etc.). func DeriveAgentStatus(sessionID string) string { - path := fmt.Sprintf("%s/sessions/%s/agui-events.jsonl", StateBaseDir, sessionID) + path, ok := sessionEventsPath(sessionID) + if !ok { + return "" + } // Read only the tail of the file to avoid loading entire event log into memory. // Use 2x scannerMaxLineSize to ensure we can read at least one complete max-sized @@ -314,185 +350,162 @@ func DeriveAgentStatus(sessionID string) string { return "" } -// ─── Compaction ────────────────────────────────────────────────────── +// ─── Snapshot compaction (AG-UI serialization spec) ────────────────── // -// Go port of @ag-ui/client compactEvents. Concatenates streaming deltas -// so reconnect replays are compact and fast. - -type pendingText struct { - start map[string]interface{} - deltas []string - end map[string]interface{} - otherEvents []map[string]interface{} -} +// See: https://docs.ag-ui.com/concepts/serialization -type pendingTool struct { - start map[string]interface{} - deltas []string - end map[string]interface{} - otherEvents []map[string]interface{} +// loadEventsForReplay loads events for SSE replay. +// +// For finished runs, the file is already compacted to snapshot-only events +// by compactFinishedRun(), so we just read and return. +// +// For active runs, the file contains streaming events which are necessary +// for real-time SSE connections. +func loadEventsForReplay(sessionID string) []map[string]interface{} { + events := loadEvents(sessionID) + if len(events) > 0 { + // Check if finished or active + last := events[len(events)-1] + if last != nil { + lastType, _ := last["type"].(string) + if lastType == types.EventTypeRunFinished || lastType == types.EventTypeRunError { + log.Printf("AGUI Events: serving %d snapshot events for %s (finished)", len(events), sessionID) + } else { + log.Printf("AGUI Events: serving %d streaming events for %s (active)", len(events), sessionID) + } + } + } + return events } -// compactStreamingEvents concatenates TEXT_MESSAGE_CONTENT and TOOL_CALL_ARGS -// deltas for the same messageId/toolCallId. All other events pass through. -func compactStreamingEvents(events []map[string]interface{}) []map[string]interface{} { - compacted := make([]map[string]interface{}, 0, len(events)/2) +// compactFinishedRun replaces the raw event log with snapshot-only events. +// +// Per AG-UI serialization spec, finished runs should only store: +// - MESSAGES_SNAPSHOT (emitted by runner in finally block) +// - STATE_SNAPSHOT (emitted when state changes) +// - Lifecycle events (RUN_STARTED, RUN_FINISHED, RUN_ERROR, STEP_*) +// - Extension events (RAW, CUSTOM, META for user feedback) +// - Frontend state (ACTIVITY_SNAPSHOT) +// +// This deletes streaming events that are superseded by snapshots: +// - TEXT_MESSAGE_START/CONTENT/END (superseded by MESSAGES_SNAPSHOT) +// - TOOL_CALL_START/ARGS/END (superseded by MESSAGES_SNAPSHOT) +// - REASONING_START/END, REASONING_MESSAGE_START/CONTENT/END (superseded by MESSAGES_SNAPSHOT) +// - STATE_DELTA (superseded by STATE_SNAPSHOT) +// - ACTIVITY_DELTA (superseded by ACTIVITY_SNAPSHOT) +// +// If no MESSAGES_SNAPSHOT is found, the session is considered corrupted and +// we keep the raw events as fallback. +func compactFinishedRun(sessionID string) { + dir, ok := sessionDirPath(sessionID) + if !ok { + log.Printf("AGUI Store: compaction rejected - invalid session ID: %s", sessionID) + return + } + rawPath := filepath.Join(dir, "agui-events.jsonl") - textByID := make(map[string]*pendingText) - var textOrder []string - toolByID := make(map[string]*pendingTool) - var toolOrder []string + // Hold the write mutex for the entire read-filter-rename to prevent + // concurrent persistEvent calls from writing events that get overwritten. + mu := getWriteMutex(sessionID) + mu.Lock() + defer mu.Unlock() - getText := func(id string) *pendingText { - if p, ok := textByID[id]; ok { - return p - } - p := &pendingText{} - textByID[id] = p - textOrder = append(textOrder, id) - return p - } - getTool := func(id string) *pendingTool { - if p, ok := toolByID[id]; ok { - return p - } - p := &pendingTool{} - toolByID[id] = p - toolOrder = append(toolOrder, id) - return p + // Read all events + events, err := readJSONLFile(rawPath) + if err != nil || len(events) == 0 { + log.Printf("AGUI Store: failed to read events for compaction (%s): %v", sessionID, err) + return } - flushText := func(id string) { - p := textByID[id] - if p == nil { - return - } - if p.start != nil { - compacted = append(compacted, p.start) - } - if len(p.deltas) > 0 { - combined := "" - for _, d := range p.deltas { - combined += d + // Filter to snapshot-only events + var snapshots []map[string]interface{} + hasMessagesSnapshot := false + + for _, evt := range events { + eventType, _ := evt["type"].(string) + switch eventType { + case types.EventTypeMessagesSnapshot: + hasMessagesSnapshot = true + snapshots = append(snapshots, evt) + case types.EventTypeStateSnapshot: + snapshots = append(snapshots, evt) + case types.EventTypeRunStarted, types.EventTypeRunFinished, types.EventTypeRunError, + types.EventTypeStepStarted, types.EventTypeStepFinished: + snapshots = append(snapshots, evt) + case types.EventTypeToolCallStart: + // Preserve AskUserQuestion tool calls — DeriveAgentStatus() needs them + // to detect waiting_input status after compaction. + if toolName, _ := evt["toolCallName"].(string); isAskUserQuestionToolCall(toolName) { + snapshots = append(snapshots, evt) } - compacted = append(compacted, map[string]interface{}{ - "type": types.EventTypeTextMessageContent, - "messageId": id, - "delta": combined, - }) - } - if p.end != nil { - compacted = append(compacted, p.end) + case types.EventTypeRaw, types.EventTypeCustom, types.EventTypeMeta: + // Preserve custom events that aren't included in MESSAGES_SNAPSHOT + snapshots = append(snapshots, evt) + case types.EventTypeActivitySnapshot: + // Preserve frontend durable UI state (ACTIVITY_DELTA can be discarded, snapshot is canonical) + snapshots = append(snapshots, evt) } - compacted = append(compacted, p.otherEvents...) - delete(textByID, id) } - flushTool := func(id string) { - p := toolByID[id] - if p == nil { + // If no MESSAGES_SNAPSHOT found, session is corrupted - keep raw events + if !hasMessagesSnapshot { + log.Printf("AGUI Store: no MESSAGES_SNAPSHOT found for %s - session corrupted, keeping raw events", sessionID) + return + } + + log.Printf("AGUI Store: compacting %s from %d raw events → %d snapshot events", sessionID, len(events), len(snapshots)) + + // Write snapshots atomically to temp file + tmpFile, err := os.CreateTemp(dir, "agui-events-*.tmp") + if err != nil { + log.Printf("AGUI Store: failed to create temp file for compaction: %v", err) + return + } + tmpPath := tmpFile.Name() + + w := bufio.NewWriter(tmpFile) + for _, evt := range snapshots { + data, err := json.Marshal(evt) + if err != nil { + _ = tmpFile.Close() + _ = os.Remove(tmpPath) + log.Printf("AGUI Store: failed to marshal event during compaction: %v", err) return } - if p.start != nil { - compacted = append(compacted, p.start) - } - if len(p.deltas) > 0 { - combined := "" - for _, d := range p.deltas { - combined += d - } - compacted = append(compacted, map[string]interface{}{ - "type": types.EventTypeToolCallArgs, - "toolCallId": id, - "delta": combined, - }) + if _, err := w.Write(data); err != nil { + _ = tmpFile.Close() + _ = os.Remove(tmpPath) + log.Printf("AGUI Store: failed to write event during compaction: %v", err) + return } - if p.end != nil { - compacted = append(compacted, p.end) + if err := w.WriteByte('\n'); err != nil { + _ = tmpFile.Close() + _ = os.Remove(tmpPath) + log.Printf("AGUI Store: failed to write newline during compaction: %v", err) + return } - compacted = append(compacted, p.otherEvents...) - delete(toolByID, id) } - for _, evt := range events { - eventType, _ := evt["type"].(string) - switch eventType { - case types.EventTypeTextMessageStart: - if id, _ := evt["messageId"].(string); id != "" { - getText(id).start = evt - } else { - compacted = append(compacted, evt) - } - case types.EventTypeTextMessageContent: - if id, _ := evt["messageId"].(string); id != "" { - delta, _ := evt["delta"].(string) - getText(id).deltas = append(getText(id).deltas, delta) - } else { - compacted = append(compacted, evt) - } - case types.EventTypeTextMessageEnd: - if id, _ := evt["messageId"].(string); id != "" { - getText(id).end = evt - flushText(id) - } else { - compacted = append(compacted, evt) - } - case types.EventTypeToolCallStart: - if id, _ := evt["toolCallId"].(string); id != "" { - getTool(id).start = evt - } else { - compacted = append(compacted, evt) - } - case types.EventTypeToolCallArgs: - if id, _ := evt["toolCallId"].(string); id != "" { - delta, _ := evt["delta"].(string) - getTool(id).deltas = append(getTool(id).deltas, delta) - } else { - compacted = append(compacted, evt) - } - case types.EventTypeToolCallEnd: - if id, _ := evt["toolCallId"].(string); id != "" { - getTool(id).end = evt - flushTool(id) - } else { - compacted = append(compacted, evt) - } - default: - // Buffer "other" events into ALL currently open (incomplete) - // sequences so they replay in the correct position after - // compaction. If no sequences are open, emit directly. - buffered := false - for _, id := range textOrder { - if p := textByID[id]; p != nil && p.start != nil && p.end == nil { - p.otherEvents = append(p.otherEvents, evt) - buffered = true - } - } - for _, id := range toolOrder { - if p := toolByID[id]; p != nil && p.start != nil && p.end == nil { - p.otherEvents = append(p.otherEvents, evt) - buffered = true - } - } - if !buffered { - compacted = append(compacted, evt) - } - } + if err := w.Flush(); err != nil { + _ = tmpFile.Close() + _ = os.Remove(tmpPath) + log.Printf("AGUI Store: failed to flush buffer during compaction: %v", err) + return } - - // Flush incomplete sequences (mid-run reconnect) - for _, id := range textOrder { - if textByID[id] != nil { - flushText(id) - } + if err := tmpFile.Close(); err != nil { + _ = os.Remove(tmpPath) + log.Printf("AGUI Store: failed to close temp file during compaction: %v", err) + return } - for _, id := range toolOrder { - if toolByID[id] != nil { - flushTool(id) - } + + // Atomically replace raw events file with snapshots + if err := os.Rename(tmpPath, rawPath); err != nil { + log.Printf("AGUI Store: failed to replace raw events with snapshots: %v", err) + _ = os.Remove(tmpPath) + return } - return compacted + log.Printf("AGUI Store: successfully compacted %s to snapshot-only events", sessionID) } // ─── Timestamp sanitization ────────────────────────────────────────── diff --git a/components/backend/websocket/agui_store_test.go b/components/backend/websocket/agui_store_test.go index f9fbdd4d5..bc20cfda3 100644 --- a/components/backend/websocket/agui_store_test.go +++ b/components/backend/websocket/agui_store_test.go @@ -226,3 +226,226 @@ func TestDeriveAgentStatus(t *testing.T) { } }) } + +func TestLoadEventsForReplay(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "agui-replay-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + origStateBaseDir := StateBaseDir + StateBaseDir = tmpDir + defer func() { StateBaseDir = origStateBaseDir }() + + writeEvents := func(sessionID string, events []map[string]interface{}) { + sessionsDir := filepath.Join(tmpDir, "sessions", sessionID) + if err := os.MkdirAll(sessionsDir, 0755); err != nil { + t.Fatalf("Failed to create sessions dir: %v", err) + } + eventsFile := filepath.Join(sessionsDir, "agui-events.jsonl") + f, err := os.Create(eventsFile) + if err != nil { + t.Fatalf("Failed to create events file: %v", err) + } + for _, evt := range events { + data, _ := json.Marshal(evt) + f.Write(append(data, '\n')) + } + f.Close() + } + + t.Run("finished session with MESSAGES_SNAPSHOT gets compacted", func(t *testing.T) { + sessionID := "test-replay-finished" + // Simulate what the runner sends: streaming events + MESSAGES_SNAPSHOT + RUN_FINISHED + writeEvents(sessionID, []map[string]interface{}{ + {"type": types.EventTypeRunStarted, "runId": "r1"}, + {"type": types.EventTypeTextMessageStart, "messageId": "msg1", "role": "user"}, + {"type": types.EventTypeTextMessageContent, "messageId": "msg1", "delta": "Hello"}, + {"type": types.EventTypeTextMessageEnd, "messageId": "msg1"}, + {"type": types.EventTypeMessagesSnapshot, "messages": []interface{}{ + map[string]interface{}{"id": "msg1", "role": "user", "content": "Hello"}, + }}, + {"type": types.EventTypeRunFinished, "runId": "r1"}, + }) + + // Manually trigger compaction (in production, persistEvent does this on RUN_FINISHED) + compactFinishedRun(sessionID) + + // Wait for async compaction + + result := loadEventsForReplay(sessionID) + + // Should be compacted: RUN_STARTED + MESSAGES_SNAPSHOT + RUN_FINISHED + if len(result) != 3 { + t.Fatalf("Expected 3 events after compaction, got %d", len(result)) + } + + hasSnapshot := false + for _, evt := range result { + if evt["type"] == types.EventTypeMessagesSnapshot { + hasSnapshot = true + } + // Verify streaming events were removed + eventType := evt["type"] + if eventType == types.EventTypeTextMessageStart || + eventType == types.EventTypeTextMessageContent || + eventType == types.EventTypeTextMessageEnd { + t.Errorf("Streaming event %s should have been removed", eventType) + } + } + if !hasSnapshot { + t.Error("Expected MESSAGES_SNAPSHOT in compacted events") + } + }) + + t.Run("active session returns raw events", func(t *testing.T) { + sessionID := "test-replay-active" + writeEvents(sessionID, []map[string]interface{}{ + {"type": types.EventTypeRunStarted, "runId": "r1"}, + {"type": types.EventTypeTextMessageStart, "messageId": "msg1", "role": "user"}, + {"type": types.EventTypeTextMessageContent, "messageId": "msg1", "delta": "Hello"}, + }) + + result := loadEventsForReplay(sessionID) + + // Active run — should return raw events unchanged + if len(result) != 3 { + t.Fatalf("Expected 3 raw events, got %d", len(result)) + } + if result[0]["type"] != types.EventTypeRunStarted { + t.Errorf("Expected RUN_STARTED, got %v", result[0]["type"]) + } + if result[1]["type"] != types.EventTypeTextMessageStart { + t.Errorf("Expected TEXT_MESSAGE_START, got %v", result[1]["type"]) + } + }) + + t.Run("corrupted session without MESSAGES_SNAPSHOT keeps raw events", func(t *testing.T) { + sessionID := "test-replay-corrupted" + // Simulate a corrupted session: has RUN_FINISHED but no MESSAGES_SNAPSHOT + writeEvents(sessionID, []map[string]interface{}{ + {"type": types.EventTypeRunStarted, "runId": "r1"}, + {"type": types.EventTypeTextMessageStart, "messageId": "msg1", "role": "user"}, + {"type": types.EventTypeTextMessageContent, "messageId": "msg1", "delta": "Hello"}, + {"type": types.EventTypeTextMessageEnd, "messageId": "msg1"}, + {"type": types.EventTypeRunFinished, "runId": "r1"}, + }) + + // Try to compact (should fail gracefully and keep raw events) + compactFinishedRun(sessionID) + + result := loadEventsForReplay(sessionID) + + // Should still have all raw events (compaction failed due to missing MESSAGES_SNAPSHOT) + if len(result) != 5 { + t.Fatalf("Expected 5 raw events (corruption fallback), got %d", len(result)) + } + + // Verify streaming events are still present + hasStreamingEvents := false + for _, evt := range result { + eventType := evt["type"] + if eventType == types.EventTypeTextMessageStart || + eventType == types.EventTypeTextMessageContent { + hasStreamingEvents = true + } + } + if !hasStreamingEvents { + t.Error("Expected streaming events to be preserved for corrupted session") + } + }) + + t.Run("STATE_SNAPSHOT is preserved during compaction", func(t *testing.T) { + sessionID := "test-replay-state" + // Simulate session with STATE_SNAPSHOT + writeEvents(sessionID, []map[string]interface{}{ + {"type": types.EventTypeRunStarted, "runId": "r1"}, + {"type": types.EventTypeStateSnapshot, "snapshot": map[string]interface{}{"count": 42}}, + {"type": types.EventTypeTextMessageStart, "messageId": "msg1", "role": "assistant"}, + {"type": types.EventTypeTextMessageContent, "messageId": "msg1", "delta": "Done"}, + {"type": types.EventTypeTextMessageEnd, "messageId": "msg1"}, + {"type": types.EventTypeMessagesSnapshot, "messages": []interface{}{ + map[string]interface{}{"id": "msg1", "role": "assistant", "content": "Done"}, + }}, + {"type": types.EventTypeRunFinished, "runId": "r1"}, + }) + + compactFinishedRun(sessionID) + + result := loadEventsForReplay(sessionID) + + // Should have: RUN_STARTED + STATE_SNAPSHOT + MESSAGES_SNAPSHOT + RUN_FINISHED = 4 events + if len(result) != 4 { + t.Fatalf("Expected 4 events after compaction, got %d", len(result)) + } + + hasStateSnapshot := false + for _, evt := range result { + if evt["type"] == types.EventTypeStateSnapshot { + hasStateSnapshot = true + } + } + if !hasStateSnapshot { + t.Error("Expected STATE_SNAPSHOT to be preserved during compaction") + } + }) + + t.Run("META and CUSTOM events are preserved during compaction", func(t *testing.T) { + sessionID := "test-replay-custom" + // Simulate session with user feedback and custom events + writeEvents(sessionID, []map[string]interface{}{ + {"type": types.EventTypeRunStarted, "runId": "r1"}, + {"type": types.EventTypeTextMessageStart, "messageId": "msg1", "role": "assistant"}, + {"type": types.EventTypeTextMessageContent, "messageId": "msg1", "delta": "Hello"}, + {"type": types.EventTypeTextMessageEnd, "messageId": "msg1"}, + {"type": types.EventTypeMeta, "metaType": "thumbs_up", "payload": map[string]interface{}{"messageId": "msg1"}}, + {"type": types.EventTypeCustom, "customType": "platform_event", "data": "important"}, + {"type": types.EventTypeRaw, "event": map[string]interface{}{"type": "message_metadata", "hidden": true}}, + {"type": types.EventTypeMessagesSnapshot, "messages": []interface{}{ + map[string]interface{}{"id": "msg1", "role": "assistant", "content": "Hello"}, + }}, + {"type": types.EventTypeRunFinished, "runId": "r1"}, + }) + + compactFinishedRun(sessionID) + + result := loadEventsForReplay(sessionID) + + // Should have: RUN_STARTED + META + CUSTOM + RAW + MESSAGES_SNAPSHOT + RUN_FINISHED = 6 events + if len(result) != 6 { + t.Fatalf("Expected 6 events after compaction, got %d", len(result)) + } + + hasMeta := false + hasCustom := false + hasRaw := false + for _, evt := range result { + eventType := evt["type"] + if eventType == types.EventTypeMeta { + hasMeta = true + } + if eventType == types.EventTypeCustom { + hasCustom = true + } + if eventType == types.EventTypeRaw { + hasRaw = true + } + // Verify streaming events were removed + if eventType == types.EventTypeTextMessageStart || + eventType == types.EventTypeTextMessageContent || + eventType == types.EventTypeTextMessageEnd { + t.Errorf("Streaming event %s should have been removed", eventType) + } + } + if !hasMeta { + t.Error("Expected META event to be preserved during compaction") + } + if !hasCustom { + t.Error("Expected CUSTOM event to be preserved during compaction") + } + if !hasRaw { + t.Error("Expected RAW event to be preserved during compaction") + } + }) +} diff --git a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx index 361870ce6..59da3ca15 100644 --- a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx +++ b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx @@ -899,6 +899,23 @@ export default function ProjectSessionDetailPage({ }, }); } + } else if (msg.role === "reasoning" || msg.role === "developer") { + // ReasoningMessage (role="reasoning") per AG-UI spec carries thinking content. + // Also handle legacy DeveloperMessage (role="developer") from older sessions. + const thinkingText = typeof msg.content === 'string' ? msg.content : ''; + if (thinkingText) { + result.push({ + type: "agent_message", + id: msg.id, + content: { + type: "reasoning_block", + thinking: thinkingText, + signature: "", + }, + model: "claude", + timestamp, + }); + } } else if (msg.role === "system") { result.push({ type: "system_message", @@ -1048,7 +1065,32 @@ export default function ProjectSessionDetailPage({ } } - return result; + // Deduplicate reasoning blocks. Old sessions may have both REASONING_* + // streaming events (no messageId) and a MESSAGES_SNAPSHOT developer/reasoning + // message with a different ID — producing two identical thinking blocks. + // Use msg.id for keyed messages; fall back to content matching only for + // unkeyed (anonymous) duplicates within the same reasoning text. + const seenReasoningIds = new Set(); + const seenAnonThinking = new Set(); + const deduped = result.filter(msg => { + const content = 'content' in msg && typeof msg.content === 'object' && msg.content !== null ? msg.content : null; + if (content && 'thinking' in content && content.type === 'reasoning_block') { + const msgId = 'id' in msg ? (msg as { id: string }).id : ''; + if (msgId) { + // Keyed message — deduplicate by ID + if (seenReasoningIds.has(msgId)) return false; + seenReasoningIds.add(msgId); + } else { + // Anonymous legacy message — deduplicate by content + const key = (content as { thinking: string }).thinking; + if (seenAnonThinking.has(key)) return false; + seenAnonThinking.add(key); + } + } + return true; + }); + + return deduped; }, [ aguiState.messages, aguiState.currentToolCall, // Needed in Phase A to avoid orphaned-child promotion diff --git a/components/manifests/base/core/backend-deployment.yaml b/components/manifests/base/core/backend-deployment.yaml index ea72f3927..15ffa9db5 100644 --- a/components/manifests/base/core/backend-deployment.yaml +++ b/components/manifests/base/core/backend-deployment.yaml @@ -211,10 +211,10 @@ spec: resources: requests: cpu: 100m - memory: 128Mi + memory: 256Mi limits: cpu: 500m - memory: 512Mi + memory: 768Mi livenessProbe: httpGet: path: /health diff --git a/components/runners/ambient-runner/ag_ui_claude_sdk/adapter.py b/components/runners/ambient-runner/ag_ui_claude_sdk/adapter.py index 8ce97564d..0ec30d98b 100644 --- a/components/runners/ambient-runner/ag_ui_claude_sdk/adapter.py +++ b/components/runners/ambient-runner/ag_ui_claude_sdk/adapter.py @@ -622,6 +622,7 @@ async def _stream_claude_sdk( run_messages: List[Any] = [] pending_msg: Optional[Dict[str, Any]] = None accumulated_thinking_text = "" + current_reasoning_id: Optional[str] = None def _get_msg_id(msg): """Extract message ID from either a dict or an object.""" @@ -739,8 +740,9 @@ def flush_pending_msg(): if thinking_chunk: accumulated_thinking_text += thinking_chunk yield ReasoningMessageContentEvent( - thread_id=thread_id, - run_id=run_id, + threadId=thread_id, + runId=run_id, + messageId=current_reasoning_id, delta=thinking_chunk, ) elif delta_type == "input_json_delta": @@ -765,12 +767,15 @@ def flush_pending_msg(): if block_type == "thinking": in_thinking_block = True + current_reasoning_id = str(uuid.uuid4()) ts = now_ms() yield ReasoningStartEvent( - thread_id=thread_id, run_id=run_id, timestamp=ts + threadId=thread_id, runId=run_id, + messageId=current_reasoning_id, timestamp=ts, ) yield ReasoningMessageStartEvent( - thread_id=thread_id, run_id=run_id, timestamp=ts + threadId=thread_id, runId=run_id, + messageId=current_reasoning_id, timestamp=ts, ) elif block_type == "tool_use": # Tool call starting - emit TOOL_CALL_START @@ -802,20 +807,24 @@ def flush_pending_msg(): in_thinking_block = False ts = now_ms() yield ReasoningMessageEndEvent( - thread_id=thread_id, run_id=run_id, timestamp=ts + threadId=thread_id, runId=run_id, + messageId=current_reasoning_id, timestamp=ts, ) yield ReasoningEndEvent( - thread_id=thread_id, run_id=run_id, timestamp=ts + threadId=thread_id, runId=run_id, + messageId=current_reasoning_id, timestamp=ts, ) - # Persist thinking content + # Persist thinking content as ReasoningMessage per AG-UI spec. + # Use the same ID as the streaming events so the frontend + # merge logic deduplicates on MESSAGES_SNAPSHOT arrival. if accumulated_thinking_text: - from ag_ui.core import DeveloperMessage + from ag_ui.core import ReasoningMessage upsert_message( - DeveloperMessage( - id=str(uuid.uuid4()), - role="developer", + ReasoningMessage( + id=current_reasoning_id, + role="reasoning", content=accumulated_thinking_text, ) ) @@ -1115,10 +1124,10 @@ def flush_pending_msg(): logger.debug("Cleanup: closing hanging thinking block") ts = now_ms() yield ReasoningMessageEndEvent( - thread_id=thread_id, run_id=run_id, timestamp=ts + threadId=thread_id, runId=run_id, timestamp=ts ) yield ReasoningEndEvent( - thread_id=thread_id, run_id=run_id, timestamp=ts + threadId=thread_id, runId=run_id, timestamp=ts ) in_thinking_block = False diff --git a/components/runners/ambient-runner/ag_ui_claude_sdk/handlers.py b/components/runners/ambient-runner/ag_ui_claude_sdk/handlers.py index 094066b06..5635ecd08 100644 --- a/components/runners/ambient-runner/ag_ui_claude_sdk/handlers.py +++ b/components/runners/ambient-runner/ag_ui_claude_sdk/handlers.py @@ -232,15 +232,15 @@ async def handle_thinking_block( # Emit standard AG-UI reasoning events if thinking_text: ts = now_ms() - yield ReasoningStartEvent(thread_id=thread_id, run_id=run_id, timestamp=ts) + yield ReasoningStartEvent(threadId=thread_id, runId=run_id, timestamp=ts) yield ReasoningMessageStartEvent( - thread_id=thread_id, run_id=run_id, timestamp=ts + threadId=thread_id, runId=run_id, timestamp=ts ) yield ReasoningMessageContentEvent( - thread_id=thread_id, run_id=run_id, delta=thinking_text + threadId=thread_id, runId=run_id, delta=thinking_text ) - yield ReasoningMessageEndEvent(thread_id=thread_id, run_id=run_id, timestamp=ts) - yield ReasoningEndEvent(thread_id=thread_id, run_id=run_id, timestamp=ts) + yield ReasoningMessageEndEvent(threadId=thread_id, runId=run_id, timestamp=ts) + yield ReasoningEndEvent(threadId=thread_id, runId=run_id, timestamp=ts) # Also emit signature as custom event if present if signature: diff --git a/components/runners/ambient-runner/ag_ui_claude_sdk/reasoning_events.py b/components/runners/ambient-runner/ag_ui_claude_sdk/reasoning_events.py index d22e3871a..fda24c71e 100644 --- a/components/runners/ambient-runner/ag_ui_claude_sdk/reasoning_events.py +++ b/components/runners/ambient-runner/ag_ui_claude_sdk/reasoning_events.py @@ -9,50 +9,66 @@ ``REASONING_*`` wire format so the runner speaks the current AG-UI spec. Once ``ag-ui-protocol`` adds native support, these can be replaced with direct imports. + +Fields use camelCase aliases to match the AG-UI wire format (the frontend +reads ``messageId``, ``threadId``, ``runId`` — not snake_case). """ from typing import Literal, Optional -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict + + +class _ReasoningBase(BaseModel): + """Base with camelCase serialization to match AG-UI wire format.""" + model_config = ConfigDict(populate_by_name=True) + + def model_dump(self, **kwargs): + kwargs.setdefault("by_alias", True) + return super().model_dump(**kwargs) + + def dict(self, **kwargs): + kwargs.setdefault("by_alias", True) + return super().dict(**kwargs) -class ReasoningStartEvent(BaseModel): +class ReasoningStartEvent(_ReasoningBase): type: Literal["REASONING_START"] = "REASONING_START" - thread_id: Optional[str] = None - run_id: Optional[str] = None - message_id: Optional[str] = None + threadId: Optional[str] = None + runId: Optional[str] = None + messageId: Optional[str] = None timestamp: Optional[int] = None -class ReasoningEndEvent(BaseModel): +class ReasoningEndEvent(_ReasoningBase): type: Literal["REASONING_END"] = "REASONING_END" - thread_id: Optional[str] = None - run_id: Optional[str] = None - message_id: Optional[str] = None + threadId: Optional[str] = None + runId: Optional[str] = None + messageId: Optional[str] = None timestamp: Optional[int] = None -class ReasoningMessageStartEvent(BaseModel): +class ReasoningMessageStartEvent(_ReasoningBase): type: Literal["REASONING_MESSAGE_START"] = "REASONING_MESSAGE_START" - thread_id: Optional[str] = None - run_id: Optional[str] = None - message_id: Optional[str] = None + threadId: Optional[str] = None + runId: Optional[str] = None + messageId: Optional[str] = None role: str = "assistant" timestamp: Optional[int] = None -class ReasoningMessageContentEvent(BaseModel): +class ReasoningMessageContentEvent(_ReasoningBase): type: Literal["REASONING_MESSAGE_CONTENT"] = "REASONING_MESSAGE_CONTENT" - thread_id: Optional[str] = None - run_id: Optional[str] = None - message_id: Optional[str] = None + threadId: Optional[str] = None + runId: Optional[str] = None + messageId: Optional[str] = None delta: str = "" timestamp: Optional[int] = None -class ReasoningMessageEndEvent(BaseModel): +class ReasoningMessageEndEvent(_ReasoningBase): type: Literal["REASONING_MESSAGE_END"] = "REASONING_MESSAGE_END" - thread_id: Optional[str] = None - run_id: Optional[str] = None - message_id: Optional[str] = None + threadId: Optional[str] = None + runId: Optional[str] = None + messageId: Optional[str] = None timestamp: Optional[int] = None