diff --git a/components/backend/handlers/sessions.go b/components/backend/handlers/sessions.go index 92484f571..81cfc5503 100644 --- a/components/backend/handlers/sessions.go +++ b/components/backend/handlers/sessions.go @@ -44,6 +44,9 @@ var ( GetGitHubToken func(context.Context, kubernetes.Interface, dynamic.Interface, string, string) (string, error) GetGitLabToken func(context.Context, kubernetes.Interface, string, string) (string, error) DeriveRepoFolderFromURL func(string) string + // DeriveAgentStatusFromEvents derives agentStatus from the persisted event log. + // Set by the websocket package at init to avoid circular imports. + DeriveAgentStatusFromEvents func(sessionName string) string // LEGACY: SendMessageToSession removed - AG-UI server uses HTTP/SSE instead of WebSocket ) @@ -364,6 +367,25 @@ func parseStatus(status map[string]interface{}) *types.AgenticSessionStatus { // V2 API Handlers - Multi-tenant session management +// enrichAgentStatus derives agentStatus from the persisted event log for +// Running sessions. This is the source of truth — it replaces the stale +// CR-cached value which was subject to goroutine race conditions. +func enrichAgentStatus(session *types.AgenticSession) { + if session.Status == nil || session.Status.Phase != "Running" { + return + } + if DeriveAgentStatusFromEvents == nil { + return + } + name, _ := session.Metadata["name"].(string) + if name == "" { + return + } + if derived := DeriveAgentStatusFromEvents(name); derived != "" { + session.Status.AgentStatus = types.StringPtr(derived) + } +} + func ListSessions(c *gin.Context) { project := c.GetString("project") @@ -447,6 +469,11 @@ func ListSessions(c *gin.Context) { totalCount := len(sessions) paginatedSessions, hasMore, nextOffset := paginateSessions(sessions, params.Offset, params.Limit) + // Derive agentStatus from event log only for paginated sessions (performance optimization) + for i := range paginatedSessions { + enrichAgentStatus(&paginatedSessions[i]) + } + response := types.PaginatedResponse{ Items: paginatedSessions, TotalCount: totalCount, @@ -919,6 +946,9 @@ func GetSession(c *gin.Context) { session.Status = parseStatus(status) } + // Derive agentStatus from event log (source of truth) for running sessions + enrichAgentStatus(&session) + session.AutoBranch = ComputeAutoBranch(sessionName) c.JSON(http.StatusOK, session) diff --git a/components/backend/main.go b/components/backend/main.go index dc2938d16..c8ac7da8c 100644 --- a/components/backend/main.go +++ b/components/backend/main.go @@ -166,6 +166,7 @@ func main() { // Initialize websocket package websocket.StateBaseDir = server.StateBaseDir + handlers.DeriveAgentStatusFromEvents = websocket.DeriveAgentStatus // Normal server mode if err := server.Run(registerRoutes); err != nil { diff --git a/components/backend/types/agui.go b/components/backend/types/agui.go index 72041da83..24b7fda82 100644 --- a/components/backend/types/agui.go +++ b/components/backend/types/agui.go @@ -69,6 +69,13 @@ const ( EventTypeMeta = "META" ) +// Agent status values derived from the AG-UI event stream. +const ( + AgentStatusWorking = "working" + AgentStatusIdle = "idle" + AgentStatusWaitingInput = "waiting_input" +) + // AG-UI Message Roles // See: https://docs.ag-ui.com/concepts/messages const ( diff --git a/components/backend/types/session.go b/components/backend/types/session.go index 828a9630f..e34aacb42 100644 --- a/components/backend/types/session.go +++ b/components/backend/types/session.go @@ -42,6 +42,7 @@ type AgenticSessionStatus struct { StartTime *string `json:"startTime,omitempty"` CompletionTime *string `json:"completionTime,omitempty"` LastActivityTime *string `json:"lastActivityTime,omitempty"` + AgentStatus *string `json:"agentStatus,omitempty"` StoppedReason *string `json:"stoppedReason,omitempty"` ReconciledRepos []ReconciledRepo `json:"reconciledRepos,omitempty"` ReconciledWorkflow *ReconciledWorkflow `json:"reconciledWorkflow,omitempty"` diff --git a/components/backend/websocket/agui_proxy.go b/components/backend/websocket/agui_proxy.go index 862c2ba5e..d448c0e3f 100644 --- a/components/backend/websocket/agui_proxy.go +++ b/components/backend/websocket/agui_proxy.go @@ -439,15 +439,19 @@ func persistStreamedEvent(sessionID, runID, threadID, jsonData string) { persistEvent(sessionID, event) - // Update lastActivityTime on CR for activity events (debounced). - // Extract event type to check; projectName is derived from the + // Extract event type; projectName is derived from the // sessionID-to-project mapping populated by HandleAGUIRunProxy. eventType, _ := event["type"].(string) + + // Update lastActivityTime on CR for activity events (debounced). if isActivityEvent(eventType) { if projectName, ok := sessionProjectMap.Load(sessionID); ok { updateLastActivityTime(projectName.(string), sessionID, eventType == types.EventTypeRunStarted) } } + + // agentStatus is derived at query time from the event log (DeriveAgentStatus). + // No CR updates needed here — the persisted events ARE the source of truth. } // ─── POST /agui/interrupt ──────────────────────────────────────────── @@ -948,3 +952,16 @@ func updateLastActivityTime(projectName, sessionName string, immediate bool) { } }() } + +// isAskUserQuestionToolCall checks if a tool call name is the AskUserQuestion HITL tool. +// Uses case-insensitive comparison after stripping non-alpha characters, +// matching the frontend pattern in use-agent-status.ts. +func isAskUserQuestionToolCall(name string) bool { + var clean strings.Builder + for _, r := range strings.ToLower(name) { + if r >= 'a' && r <= 'z' { + clean.WriteRune(r) + } + } + return clean.String() == "askuserquestion" +} diff --git a/components/backend/websocket/agui_store.go b/components/backend/websocket/agui_store.go index 548289823..baf7142ca 100644 --- a/components/backend/websocket/agui_store.go +++ b/components/backend/websocket/agui_store.go @@ -12,6 +12,7 @@ package websocket import ( "ambient-code-backend/types" "bufio" + "bytes" "encoding/json" "fmt" "log" @@ -210,6 +211,108 @@ func loadEvents(sessionID string) []map[string]interface{} { return events } +// DeriveAgentStatus reads a session's event log and returns the agent +// status derived from the last significant events. +// +// Returns "" if the status cannot be determined (no events, file missing, etc.). +func DeriveAgentStatus(sessionID string) string { + path := fmt.Sprintf("%s/sessions/%s/agui-events.jsonl", StateBaseDir, sessionID) + + // Read only the tail of the file to avoid loading entire event log into memory. + // 64KB is sufficient for recent lifecycle events (scanning backwards). + const maxTailBytes = 64 * 1024 + + file, err := os.Open(path) + if err != nil { + return "" + } + defer file.Close() + + stat, err := file.Stat() + if err != nil { + return "" + } + + fileSize := stat.Size() + var data []byte + + if fileSize <= maxTailBytes { + // File is small, read it all + data, err = os.ReadFile(path) + if err != nil { + return "" + } + } else { + // File is large, seek to tail and read last N bytes + offset := fileSize - maxTailBytes + _, err = file.Seek(offset, 0) + if err != nil { + return "" + } + + data = make([]byte, maxTailBytes) + n, err := file.Read(data) + if err != nil { + return "" + } + data = data[:n] + + // Skip partial first line (we seeked into the middle of a line) + if idx := bytes.IndexByte(data, '\n'); idx >= 0 { + data = data[idx+1:] + } + } + + lines := splitLines(data) + + // Scan backwards. We only care about lifecycle and AskUserQuestion events. + // RUN_STARTED → "working" + // RUN_FINISHED / RUN_ERROR → "idle", unless same run had AskUserQuestion + // TOOL_CALL_START (AskUserQuestion) → "waiting_input" + var runEndRunID string // set when we hit RUN_FINISHED/RUN_ERROR and need to look deeper + for i := len(lines) - 1; i >= 0; i-- { + if len(lines[i]) == 0 { + continue + } + var evt map[string]interface{} + if err := json.Unmarshal(lines[i], &evt); err != nil { + continue + } + evtType, _ := evt["type"].(string) + + switch evtType { + case types.EventTypeRunStarted: + if runEndRunID != "" { + // We were scanning for an AskUserQuestion but hit RUN_STARTED first → idle + return types.AgentStatusIdle + } + return types.AgentStatusWorking + + case types.EventTypeRunFinished, types.EventTypeRunError: + if runEndRunID == "" { + // First run-end seen; scan deeper within this run for AskUserQuestion + runEndRunID, _ = evt["runId"].(string) + } + + case types.EventTypeToolCallStart: + if runEndRunID != "" { + // Only relevant if we're scanning within the ended run + if evtRunID, _ := evt["runId"].(string); evtRunID != "" && evtRunID != runEndRunID { + return types.AgentStatusIdle + } + } + if toolName, _ := evt["toolCallName"].(string); isAskUserQuestionToolCall(toolName) { + return types.AgentStatusWaitingInput + } + } + } + + if runEndRunID != "" { + return types.AgentStatusIdle + } + return "" +} + // ─── Compaction ────────────────────────────────────────────────────── // // Go port of @ag-ui/client compactEvents. Concatenates streaming deltas diff --git a/components/backend/websocket/agui_store_test.go b/components/backend/websocket/agui_store_test.go new file mode 100644 index 000000000..f9fbdd4d5 --- /dev/null +++ b/components/backend/websocket/agui_store_test.go @@ -0,0 +1,228 @@ +package websocket + +import ( + "ambient-code-backend/types" + "encoding/json" + "os" + "path/filepath" + "testing" +) + +func TestDeriveAgentStatus(t *testing.T) { + // Create a temporary directory for test files + tmpDir, err := os.MkdirTemp("", "agui-store-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + // Set the StateBaseDir to our temp directory for testing + origStateBaseDir := StateBaseDir + StateBaseDir = tmpDir + defer func() { StateBaseDir = origStateBaseDir }() + + t.Run("empty file returns empty status", func(t *testing.T) { + sessionID := "test-session-empty" + sessionsDir := filepath.Join(tmpDir, "sessions", sessionID) + if err := os.MkdirAll(sessionsDir, 0755); err != nil { + t.Fatalf("Failed to create sessions dir: %v", err) + } + + // Create empty events file + eventsFile := filepath.Join(sessionsDir, "agui-events.jsonl") + if err := os.WriteFile(eventsFile, []byte(""), 0644); err != nil { + t.Fatalf("Failed to write events file: %v", err) + } + + status := DeriveAgentStatus(sessionID) + if status != "" { + t.Errorf("Expected empty status for empty file, got %q", status) + } + }) + + t.Run("RUN_STARTED only returns working", func(t *testing.T) { + sessionID := "test-session-run-started" + sessionsDir := filepath.Join(tmpDir, "sessions", sessionID) + if err := os.MkdirAll(sessionsDir, 0755); err != nil { + t.Fatalf("Failed to create sessions dir: %v", err) + } + + // Create events file with RUN_STARTED event + event := map[string]interface{}{ + "type": types.EventTypeRunStarted, + "runId": "run-123", + } + eventData, _ := json.Marshal(event) + eventsFile := filepath.Join(sessionsDir, "agui-events.jsonl") + if err := os.WriteFile(eventsFile, append(eventData, '\n'), 0644); err != nil { + t.Fatalf("Failed to write events file: %v", err) + } + + status := DeriveAgentStatus(sessionID) + if status != types.AgentStatusWorking { + t.Errorf("Expected %q for RUN_STARTED, got %q", types.AgentStatusWorking, status) + } + }) + + t.Run("RUN_FINISHED returns idle", func(t *testing.T) { + sessionID := "test-session-run-finished" + sessionsDir := filepath.Join(tmpDir, "sessions", sessionID) + if err := os.MkdirAll(sessionsDir, 0755); err != nil { + t.Fatalf("Failed to create sessions dir: %v", err) + } + + // Create events file with RUN_STARTED then RUN_FINISHED + events := []map[string]interface{}{ + {"type": types.EventTypeRunStarted, "runId": "run-123"}, + {"type": types.EventTypeRunFinished, "runId": "run-123"}, + } + eventsFile := filepath.Join(sessionsDir, "agui-events.jsonl") + f, err := os.Create(eventsFile) + if err != nil { + t.Fatalf("Failed to create events file: %v", err) + } + for _, evt := range events { + data, _ := json.Marshal(evt) + f.Write(append(data, '\n')) + } + f.Close() + + status := DeriveAgentStatus(sessionID) + if status != types.AgentStatusIdle { + t.Errorf("Expected %q for RUN_FINISHED, got %q", types.AgentStatusIdle, status) + } + }) + + t.Run("RUN_FINISHED with same-run AskUserQuestion returns waiting_input", func(t *testing.T) { + sessionID := "test-session-waiting-input" + sessionsDir := filepath.Join(tmpDir, "sessions", sessionID) + if err := os.MkdirAll(sessionsDir, 0755); err != nil { + t.Fatalf("Failed to create sessions dir: %v", err) + } + + // Create events file with RUN_STARTED, AskUserQuestion TOOL_CALL_START, then RUN_FINISHED + // Scanning backwards: RUN_FINISHED → looks deeper → finds AskUserQuestion with same runId + events := []map[string]interface{}{ + {"type": types.EventTypeRunStarted, "runId": "run-123"}, + {"type": types.EventTypeToolCallStart, "runId": "run-123", "toolCallName": "AskUserQuestion"}, + {"type": types.EventTypeRunFinished, "runId": "run-123"}, + } + eventsFile := filepath.Join(sessionsDir, "agui-events.jsonl") + f, err := os.Create(eventsFile) + if err != nil { + t.Fatalf("Failed to create events file: %v", err) + } + for _, evt := range events { + data, _ := json.Marshal(evt) + f.Write(append(data, '\n')) + } + f.Close() + + status := DeriveAgentStatus(sessionID) + if status != types.AgentStatusWaitingInput { + t.Errorf("Expected %q for same-run AskUserQuestion, got %q", types.AgentStatusWaitingInput, status) + } + }) + + t.Run("RUN_FINISHED with different-run AskUserQuestion returns idle", func(t *testing.T) { + sessionID := "test-session-different-run" + sessionsDir := filepath.Join(tmpDir, "sessions", sessionID) + if err := os.MkdirAll(sessionsDir, 0755); err != nil { + t.Fatalf("Failed to create sessions dir: %v", err) + } + + // Create events file with old AskUserQuestion from run-456, then run-123 finishes + // Scanning backwards: RUN_FINISHED(run-123) → looks deeper → finds AskUserQuestion(run-456) → different run → idle + events := []map[string]interface{}{ + {"type": types.EventTypeRunStarted, "runId": "run-456"}, + {"type": types.EventTypeToolCallStart, "runId": "run-456", "toolCallName": "AskUserQuestion"}, + {"type": types.EventTypeRunFinished, "runId": "run-456"}, + {"type": types.EventTypeRunStarted, "runId": "run-123"}, + {"type": types.EventTypeRunFinished, "runId": "run-123"}, + } + eventsFile := filepath.Join(sessionsDir, "agui-events.jsonl") + f, err := os.Create(eventsFile) + if err != nil { + t.Fatalf("Failed to create events file: %v", err) + } + for _, evt := range events { + data, _ := json.Marshal(evt) + f.Write(append(data, '\n')) + } + f.Close() + + status := DeriveAgentStatus(sessionID) + if status != types.AgentStatusIdle { + t.Errorf("Expected %q for different-run AskUserQuestion, got %q", types.AgentStatusIdle, status) + } + }) + + t.Run("RUN_ERROR returns idle", func(t *testing.T) { + sessionID := "test-session-run-error" + sessionsDir := filepath.Join(tmpDir, "sessions", sessionID) + if err := os.MkdirAll(sessionsDir, 0755); err != nil { + t.Fatalf("Failed to create sessions dir: %v", err) + } + + // Create events file with RUN_STARTED then RUN_ERROR + events := []map[string]interface{}{ + {"type": types.EventTypeRunStarted, "runId": "run-123"}, + {"type": types.EventTypeRunError, "runId": "run-123"}, + } + eventsFile := filepath.Join(sessionsDir, "agui-events.jsonl") + f, err := os.Create(eventsFile) + if err != nil { + t.Fatalf("Failed to create events file: %v", err) + } + for _, evt := range events { + data, _ := json.Marshal(evt) + f.Write(append(data, '\n')) + } + f.Close() + + status := DeriveAgentStatus(sessionID) + if status != types.AgentStatusIdle { + t.Errorf("Expected %q for RUN_ERROR, got %q", types.AgentStatusIdle, status) + } + }) + + t.Run("case-insensitive AskUserQuestion detection", func(t *testing.T) { + sessionID := "test-session-case-insensitive" + sessionsDir := filepath.Join(tmpDir, "sessions", sessionID) + if err := os.MkdirAll(sessionsDir, 0755); err != nil { + t.Fatalf("Failed to create sessions dir: %v", err) + } + + // Test various casings of AskUserQuestion + testCases := []string{"askuserquestion", "ASKUSERQUESTION", "AskUserQuestion", "AsKuSeRqUeStIoN"} + for _, toolName := range testCases { + events := []map[string]interface{}{ + {"type": types.EventTypeRunStarted, "runId": "run-123"}, + {"type": types.EventTypeToolCallStart, "runId": "run-123", "toolCallName": toolName}, + {"type": types.EventTypeRunFinished, "runId": "run-123"}, + } + eventsFile := filepath.Join(sessionsDir, "agui-events.jsonl") + f, err := os.Create(eventsFile) + if err != nil { + t.Fatalf("Failed to create events file: %v", err) + } + for _, evt := range events { + data, _ := json.Marshal(evt) + f.Write(append(data, '\n')) + } + f.Close() + + status := DeriveAgentStatus(sessionID) + if status != types.AgentStatusWaitingInput { + t.Errorf("Expected %q for toolName %q, got %q", types.AgentStatusWaitingInput, toolName, status) + } + } + }) + + t.Run("non-existent session returns empty status", func(t *testing.T) { + status := DeriveAgentStatus("non-existent-session") + if status != "" { + t.Errorf("Expected empty status for non-existent session, got %q", status) + } + }) +} diff --git a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx index ebc2e1591..1bd170c95 100644 --- a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx +++ b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx @@ -66,7 +66,9 @@ import { } from "@/components/ui/breadcrumb"; import Link from "next/link"; import { SessionHeader } from "./session-header"; -import { getPhaseColor } from "@/utils/session-helpers"; +import { SessionStatusDot } from "@/components/session-status-dot"; +import { AgentStatusIndicator } from "@/components/agent-status-indicator"; +import { useAgentStatus } from "@/hooks/use-agent-status"; // Extracted components import { AddContextModal } from "./components/modals/add-context-modal"; @@ -775,6 +777,13 @@ export default function ProjectSessionDetailPage({ handleWorkflowChange(workflowId); }; + // Derive agent-level status from session data and messages + const agentStatus = useAgentStatus( + session?.status?.phase || "Pending", + isRunActive, + aguiStream.state.messages, + ); + // Phase 1: convert committed messages + streaming tool cards into display format. // Does NOT depend on currentMessage / currentReasoning so it skips the full // O(n) traversal during text-streaming deltas (the most frequent event type). @@ -923,6 +932,12 @@ export default function ProjectSessionDetailPage({ // Handle text content by role if (msg.role === "user") { + // Hide AskUserQuestion response messages from the chat + const msgMeta = msg.metadata as Record | undefined; + if (msgMeta?.type === "ask_user_question_response") { + continue; + } + result.push({ type: "user_message", id: msg.id, // Preserve message ID for feedback association @@ -1383,6 +1398,18 @@ export default function ProjectSessionDetailPage({ } }; + // Send an AskUserQuestion response (hidden from chat, properly formatted) + const sendToolAnswer = async (formattedAnswer: string) => { + try { + await aguiSendMessage(formattedAnswer, { + type: "ask_user_question_response", + }); + } catch (err) { + toast.error(err instanceof Error ? err.message : "Failed to send answer"); + throw err; + } + }; + const handleCommandClick = async (slashCommand: string) => { try { await aguiSendMessage(slashCommand); @@ -1480,13 +1507,8 @@ export default function ProjectSessionDetailPage({ {session.spec.displayName || session.metadata.name} - - {session.status?.phase || "Pending"} - + + {agentName && ( {agentName} / {session.spec.llmSettings.model} @@ -1520,13 +1542,8 @@ export default function ProjectSessionDetailPage({ {session.spec.displayName || session.metadata.name} - - {session.status?.phase || "Pending"} - + + @@ -2628,6 +2645,7 @@ export default function ProjectSessionDetailPage({ chatInput={chatInput} setChatInput={setChatInput} onSendChat={() => Promise.resolve(sendChat())} + onSendToolAnswer={sendToolAnswer} onInterrupt={aguiInterrupt} onGoToResults={() => {}} onContinue={handleContinue} @@ -2706,6 +2724,7 @@ export default function ProjectSessionDetailPage({ chatInput={chatInput} setChatInput={setChatInput} onSendChat={() => Promise.resolve(sendChat())} + onSendToolAnswer={sendToolAnswer} onInterrupt={aguiInterrupt} onGoToResults={() => {}} onContinue={handleContinue} diff --git a/components/frontend/src/components/agent-status-indicator.tsx b/components/frontend/src/components/agent-status-indicator.tsx new file mode 100644 index 000000000..d5278b5d7 --- /dev/null +++ b/components/frontend/src/components/agent-status-indicator.tsx @@ -0,0 +1,109 @@ +"use client"; + +import { cn } from "@/lib/utils"; +import { Badge } from "@/components/ui/badge"; +import { + Loader2, + CheckCircle2, + XCircle, + Circle, + HelpCircle, +} from "lucide-react"; +import type { AgentStatus } from "@/types/agentic-session"; + +type AgentStatusIndicatorProps = { + status: AgentStatus; + compact?: boolean; + className?: string; +}; + +export function AgentStatusIndicator({ + status, + compact = false, + className, +}: AgentStatusIndicatorProps) { + switch (status) { + case "working": + return ( +
+ + {!compact && ( + + Working + + )} +
+ ); + + case "waiting_input": + return ( + + + {compact ? "Input" : "Needs Input"} + + ); + + case "completed": + return ( +
+ + {!compact && ( + + Completed + + )} +
+ ); + + case "failed": + return ( +
+ + {!compact && ( + + Failed + + )} +
+ ); + + case "idle": + return ( +
+ + {!compact && ( + Idle + )} +
+ ); + + default: + return null; + } +} diff --git a/components/frontend/src/components/session-status-dot.tsx b/components/frontend/src/components/session-status-dot.tsx new file mode 100644 index 000000000..65488075c --- /dev/null +++ b/components/frontend/src/components/session-status-dot.tsx @@ -0,0 +1,58 @@ +"use client"; + +import { cn } from "@/lib/utils"; +import { + Tooltip, + TooltipContent, + TooltipProvider, + TooltipTrigger, +} from "@/components/ui/tooltip"; +import type { AgenticSessionPhase } from "@/types/agentic-session"; + +type SessionStatusDotProps = { + phase: AgenticSessionPhase | string; + className?: string; +}; + +const DOT_COLORS: Record = { + Running: "bg-blue-500", + Completed: "bg-gray-400", + Stopped: "bg-gray-400", + Failed: "bg-red-500", + Pending: "bg-orange-400", + Creating: "bg-orange-400", + Stopping: "bg-orange-400", +}; + +const DOT_ANIMATIONS: Record = { + Creating: "animate-pulse", + Stopping: "animate-pulse", +}; + +export function SessionStatusDot({ phase, className }: SessionStatusDotProps) { + const color = DOT_COLORS[phase] || "bg-gray-400"; + const animation = DOT_ANIMATIONS[phase] || ""; + + return ( + + + + + ); + })} + + )} + + {/* Question text */} +

{currentQuestion.question}

+ + {/* Options */} + {renderQuestionOptions(currentQuestion)} + + {/* Footer */} + {!disabled && ( +
+
+ {isMultiQuestion + ? `${questions.filter(isQuestionAnswered).length}/${questions.length} answered` + : selections[currentQuestion.question] + ? "" + : "Select an option"} +
+ +
+ {isMultiQuestion && activeTab < questions.length - 1 && ( + + )} + + {(!isMultiQuestion || activeTab === questions.length - 1) && ( + + )} +
+
+ )} + + + + + ); +}; + +AskUserQuestionMessage.displayName = "AskUserQuestionMessage"; diff --git a/components/frontend/src/components/ui/stream-message.tsx b/components/frontend/src/components/ui/stream-message.tsx index bcd38db13..b7df86ed2 100644 --- a/components/frontend/src/components/ui/stream-message.tsx +++ b/components/frontend/src/components/ui/stream-message.tsx @@ -4,6 +4,7 @@ import React from "react"; import { MessageObject, ToolUseMessages, HierarchicalToolMessage } from "@/types/agentic-session"; import { LoadingDots, Message } from "@/components/ui/message"; import { ToolMessage } from "@/components/ui/tool-message"; +import { AskUserQuestionMessage } from "@/components/session/ask-user-question"; import { ThinkingMessage } from "@/components/ui/thinking-message"; import { SystemMessage } from "@/components/ui/system-message"; import { Button } from "@/components/ui/button"; @@ -12,11 +13,17 @@ import { FeedbackButtons } from "@/components/feedback"; export type StreamMessageProps = { message: (MessageObject | ToolUseMessages | HierarchicalToolMessage) & { streaming?: boolean }; onGoToResults?: () => void; + onSubmitAnswer?: (formattedAnswer: string) => Promise; plainCard?: boolean; isNewest?: boolean; agentName?: string; }; +function isAskUserQuestionTool(name: string): boolean { + const normalized = name.toLowerCase().replace(/[^a-z]/g, ""); + return normalized === "askuserquestion"; +} + const getRandomAgentMessage = () => { const messages = [ "The agents are working together on your request...", @@ -33,11 +40,24 @@ const getRandomAgentMessage = () => { return messages[Math.floor(Math.random() * messages.length)]; }; -export const StreamMessage: React.FC = ({ message, onGoToResults, plainCard=false, isNewest=false, agentName }) => { +export const StreamMessage: React.FC = ({ message, onGoToResults, onSubmitAnswer, plainCard=false, isNewest=false, agentName }) => { const isToolUsePair = (m: MessageObject | ToolUseMessages | HierarchicalToolMessage): m is ToolUseMessages | HierarchicalToolMessage => m != null && typeof m === "object" && "toolUseBlock" in m && "resultBlock" in m; if (isToolUsePair(message)) { + // Render AskUserQuestion with a custom interactive component + if (isAskUserQuestionTool(message.toolUseBlock.name)) { + return ( + + ); + } + // Check if this is a hierarchical message with children const hierarchical = message as HierarchicalToolMessage; return ( diff --git a/components/frontend/src/components/ui/tool-message.tsx b/components/frontend/src/components/ui/tool-message.tsx index 8136ddbae..512fd259b 100644 --- a/components/frontend/src/components/ui/tool-message.tsx +++ b/components/frontend/src/components/ui/tool-message.tsx @@ -308,6 +308,17 @@ const extractTextFromResultContent = (content: unknown): string => { const generateToolSummary = (toolName: string, input?: Record): string => { if (!input || Object.keys(input).length === 0) return formatToolName(toolName); + // AskUserQuestion - show first question text + if (toolName.toLowerCase().replace(/[^a-z]/g, "") === "askuserquestion") { + const questions = input.questions as Array<{ question: string }> | undefined; + if (questions?.length) { + const suffix = questions.length > 1 ? ` (+${questions.length - 1} more)` : ""; + return `Asking: "${questions[0].question}"${suffix}`; + } + return "Asking a question"; + } + + // WebSearch - show query if (toolName.toLowerCase().includes("websearch") || toolName.toLowerCase().includes("web_search")) { const query = input.query as string | undefined; diff --git a/components/frontend/src/components/workspace-sections/sessions-section.tsx b/components/frontend/src/components/workspace-sections/sessions-section.tsx index 2ffaa78f5..4a92fd777 100644 --- a/components/frontend/src/components/workspace-sections/sessions-section.tsx +++ b/components/frontend/src/components/workspace-sections/sessions-section.tsx @@ -22,7 +22,9 @@ import { } from '@/components/ui/pagination'; import { getPageNumbers } from '@/lib/pagination'; import { EmptyState } from '@/components/empty-state'; -import { SessionPhaseBadge } from '@/components/status-badge'; +import { SessionStatusDot } from '@/components/session-status-dot'; +import { AgentStatusIndicator } from '@/components/agent-status-indicator'; +import { deriveAgentStatusFromPhase } from '@/hooks/use-agent-status'; import { CreateSessionDialog } from '@/components/create-session-dialog'; import { EditSessionNameDialog } from '@/components/edit-session-name-dialog'; @@ -259,6 +261,7 @@ export function SessionsSection({ projectName }: SessionsSectionProps) { + Name Status Model @@ -277,6 +280,9 @@ export function SessionsSection({ projectName }: SessionsSectionProps) { return ( + + + @@ -298,7 +304,10 @@ export function SessionsSection({ projectName }: SessionsSectionProps) {

{session.spec.displayName || session.metadata.name}

- + {session.spec.displayName && (

{session.metadata.name}

@@ -326,7 +335,9 @@ export function SessionsSection({ projectName }: SessionsSectionProps) {
- +
diff --git a/components/frontend/src/hooks/agui/types.ts b/components/frontend/src/hooks/agui/types.ts index 3622ead09..ad3bd458f 100644 --- a/components/frontend/src/hooks/agui/types.ts +++ b/components/frontend/src/hooks/agui/types.ts @@ -25,7 +25,7 @@ export type UseAGUIStreamReturn = { state: AGUIClientState connect: (runId?: string) => void disconnect: () => void - sendMessage: (content: string) => Promise + sendMessage: (content: string, metadata?: Record) => Promise interrupt: () => Promise isConnected: boolean isStreaming: boolean diff --git a/components/frontend/src/hooks/use-agent-status.ts b/components/frontend/src/hooks/use-agent-status.ts new file mode 100644 index 000000000..b6fff8912 --- /dev/null +++ b/components/frontend/src/hooks/use-agent-status.ts @@ -0,0 +1,83 @@ +import { useMemo } from "react"; +import type { + AgenticSessionPhase, + AgentStatus, +} from "@/types/agentic-session"; +import type { PlatformMessage } from "@/types/agui"; + +function isAskUserQuestionTool(name: string): boolean { + const normalized = name.toLowerCase().replace(/[^a-z]/g, ""); + return normalized === "askuserquestion"; +} + +/** + * Derive agent status from session data and the raw AG-UI message stream. + * + * For the session detail page where the full message stream is available, + * this provides accurate status including `waiting_input` detection. + */ +export function useAgentStatus( + phase: AgenticSessionPhase | string, + isRunActive: boolean, + messages: PlatformMessage[], +): AgentStatus { + return useMemo(() => { + // Terminal states from session phase + if (phase === "Completed") return "completed"; + if (phase === "Failed") return "failed"; + if (phase === "Stopped") return "idle"; + + // Non-running phases + if (phase !== "Running") return "idle"; + + // Scan backwards for the last tool call to check for unanswered AskUserQuestion. + // Raw AG-UI messages store tool calls in msg.toolCalls[] (PlatformToolCall[]). + for (let i = messages.length - 1; i >= 0; i--) { + const msg = messages[i]; + if (!msg.toolCalls || msg.toolCalls.length === 0) continue; + + // Check the last tool call on this message + const lastTc = msg.toolCalls[msg.toolCalls.length - 1]; + if (lastTc.function?.name && isAskUserQuestionTool(lastTc.function.name)) { + const hasResult = + lastTc.result !== undefined && + lastTc.result !== null && + lastTc.result !== ""; + if (!hasResult) { + return "waiting_input"; + } + } + + // Only check the most recent message with tool calls + break; + } + + // Active processing + if (isRunActive) return "working"; + + // Running but idle between turns + return "idle"; + }, [phase, isRunActive, messages]); +} + +/** + * Derive a simplified agent status from session phase alone. + * + * Used in the session list where per-session message streams are not available. + */ +export function deriveAgentStatusFromPhase( + phase: AgenticSessionPhase | string, +): AgentStatus { + switch (phase) { + case "Running": + return "working"; + case "Completed": + return "completed"; + case "Failed": + return "failed"; + case "Stopped": + return "idle"; + default: + return "idle"; + } +} diff --git a/components/frontend/src/hooks/use-agui-stream.ts b/components/frontend/src/hooks/use-agui-stream.ts index 1f1a08c8d..01f18508e 100644 --- a/components/frontend/src/hooks/use-agui-stream.ts +++ b/components/frontend/src/hooks/use-agui-stream.ts @@ -223,7 +223,7 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur // Send a message to start/continue the conversation // AG-UI server pattern: POST returns SSE stream directly const sendMessage = useCallback( - async (content: string) => { + async (content: string, metadata?: Record) => { // Send to backend via run endpoint - this returns an SSE stream const runUrl = `/api/projects/${encodeURIComponent(projectName)}/agentic-sessions/${encodeURIComponent(sessionName)}/agui/run` @@ -231,6 +231,7 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur id: crypto.randomUUID(), role: 'user' as const, content, + ...(metadata ? { metadata } : {}), } // Add user message to state immediately for instant UI feedback. diff --git a/components/frontend/src/services/queries/use-sessions.ts b/components/frontend/src/services/queries/use-sessions.ts index e1084b02d..0a5b89109 100644 --- a/components/frontend/src/services/queries/use-sessions.ts +++ b/components/frontend/src/services/queries/use-sessions.ts @@ -40,6 +40,33 @@ export function useSessionsPaginated(projectName: string, params: PaginationPara queryFn: () => sessionsApi.listSessionsPaginated(projectName, params), enabled: !!projectName, placeholderData: keepPreviousData, // Keep previous data while fetching new page + refetchOnMount: 'always', // Always refetch when navigating back to the list + // Smart polling: tier interval based on the most active session in the list + refetchInterval: (query) => { + const data = query.state.data as { items?: AgenticSession[] } | undefined; + const items = data?.items; + if (!items?.length) return false; + + // Tier 1: Any session transitioning phases → poll aggressively (2s) + const hasTransitioning = items.some((s) => { + const phase = s.status?.phase; + return phase === 'Pending' || phase === 'Creating' || phase === 'Stopping'; + }); + if (hasTransitioning) return 2000; + + // Tier 2: Any session with agent actively working → moderate (5s) + const hasWorking = items.some((s) => { + return s.status?.phase === 'Running' && (!s.status?.agentStatus || s.status?.agentStatus === 'working'); + }); + if (hasWorking) return 5000; + + // Tier 3: Any session running but agent idle/waiting → slow (15s) + const hasRunning = items.some((s) => s.status?.phase === 'Running'); + if (hasRunning) return 15000; + + // Tier 4: All sessions terminal → no polling + return false; + }, }); } diff --git a/components/frontend/src/types/agentic-session.ts b/components/frontend/src/types/agentic-session.ts index 813684e91..06f7cb5e0 100644 --- a/components/frontend/src/types/agentic-session.ts +++ b/components/frontend/src/types/agentic-session.ts @@ -1,5 +1,34 @@ export type AgenticSessionPhase = "Pending" | "Creating" | "Running" | "Stopping" | "Stopped" | "Completed" | "Failed"; +// Agent status (derived from message stream, distinct from session phase) +export type AgentStatus = + | "working" // Actively processing + | "waiting_input" // AskUserQuestion pending, needs human response + | "completed" // Task finished successfully + | "failed" // Task errored + | "idle"; // Session running, agent between turns + +// Subset of AgentStatus that can be persisted in the CR status field +// (completed/failed are derived at query time from phase, not stored) +export type StoredAgentStatus = "working" | "idle" | "waiting_input"; + +// AskUserQuestion tool types (Claude Agent SDK built-in) +export type AskUserQuestionOption = { + label: string; + description?: string; +}; + +export type AskUserQuestionItem = { + question: string; + header?: string; + options: AskUserQuestionOption[]; + multiSelect?: boolean; +}; + +export type AskUserQuestionInput = { + questions: AskUserQuestionItem[]; +}; + export type LLMSettings = { model: string; temperature: number; @@ -162,6 +191,7 @@ export type AgenticSessionStatus = { startTime?: string; completionTime?: string; lastActivityTime?: string; + agentStatus?: StoredAgentStatus; stoppedReason?: "user" | "inactivity"; reconciledRepos?: ReconciledRepo[]; reconciledWorkflow?: ReconciledWorkflow; diff --git a/components/frontend/src/types/api/sessions.ts b/components/frontend/src/types/api/sessions.ts index 41913eacd..2b3302bc9 100644 --- a/components/frontend/src/types/api/sessions.ts +++ b/components/frontend/src/types/api/sessions.ts @@ -29,6 +29,10 @@ export type AgenticSessionPhase = | 'Completed' | 'Failed'; +// Subset of agent status values that can be persisted in the CR status field +// (completed/failed are derived at query time from phase, not stored) +export type StoredAgentStatus = "working" | "idle" | "waiting_input"; + export type LLMSettings = { model: string; temperature: number; @@ -91,6 +95,7 @@ export type AgenticSessionStatus = { startTime?: string; completionTime?: string; lastActivityTime?: string; + agentStatus?: StoredAgentStatus; stoppedReason?: "user" | "inactivity"; jobName?: string; runnerPodName?: string; diff --git a/components/runners/ambient-runner/ag_ui_claude_sdk/adapter.py b/components/runners/ambient-runner/ag_ui_claude_sdk/adapter.py index 2e0bce56e..8ce97564d 100644 --- a/components/runners/ambient-runner/ag_ui_claude_sdk/adapter.py +++ b/components/runners/ambient-runner/ag_ui_claude_sdk/adapter.py @@ -29,6 +29,7 @@ ToolCallStartEvent, ToolCallArgsEvent, ToolCallEndEvent, + ToolCallResultEvent, StateSnapshotEvent, MessagesSnapshotEvent, ) @@ -70,6 +71,12 @@ emit_system_message_events, ) +# Built-in Claude tools that should halt the stream like frontend tools. +# These are HITL (human-in-the-loop) tools that require user input before +# the agent can continue. The adapter treats them identically to frontend +# tools registered via ``input_data.tools``. +BUILTIN_FRONTEND_TOOLS: set[str] = {"AskUserQuestion"} + logger = logging.getLogger(__name__) @@ -227,6 +234,21 @@ def __init__( # Current state tracking per run (for state management) self._current_state: Optional[Any] = None + # Whether the last run halted due to a frontend tool (caller should interrupt) + self._halted: bool = False + # Tool call ID that caused the halt (so we can emit TOOL_CALL_RESULT on next run) + self._halted_tool_call_id: Optional[str] = None + + @property + def halted(self) -> bool: + """Whether the last run halted due to a frontend tool. + + When ``True`` the caller should interrupt the underlying SDK client + to prevent it from auto-approving the halted tool call with a + placeholder result. + """ + return self._halted + async def run( self, input_data: RunAgentInput, @@ -252,8 +274,13 @@ async def run( thread_id = input_data.thread_id or str(uuid.uuid4()) run_id = input_data.run_id or str(uuid.uuid4()) - # Clear result data from any previous run + # Capture halted tool call ID before clearing (for TOOL_CALL_RESULT emission) + previous_halted_tool_call_id = self._halted_tool_call_id + + # Clear result data and halt flag from any previous run self._last_result_data = None + self._halted = False + self._halted_tool_call_id = None # Initialize state tracking for this run self._current_state = input_data.state @@ -287,6 +314,20 @@ async def run( # Process all messages and extract user message user_message, _ = process_messages(input_data) + # If the previous run halted for a frontend tool (e.g. AskUserQuestion), + # emit a TOOL_CALL_RESULT so the frontend can mark the question as answered. + if previous_halted_tool_call_id and user_message: + yield ToolCallResultEvent( + type=EventType.TOOL_CALL_RESULT, + thread_id=thread_id, + run_id=run_id, + message_id=f"{previous_halted_tool_call_id}-result", + tool_call_id=previous_halted_tool_call_id, + content=user_message, + role="tool", + timestamp=now_ms(), + ) + # Extract frontend tool names for halt detection (like Strands pattern) frontend_tool_names = ( set(extract_tool_names(input_data.tools)) if input_data.tools else set() @@ -844,9 +885,12 @@ def flush_pending_msg(): ) # Check if this is a frontend tool (using unprefixed name for comparison) - # Frontend tools should halt the stream so client can execute handler + # Frontend tools should halt the stream so client can execute handler. + # Also halt for built-in HITL tools (e.g. AskUserQuestion) that + # require user input before the agent can continue. is_frontend_tool = ( current_tool_display_name in frontend_tool_names + or current_tool_display_name in BUILTIN_FRONTEND_TOOLS ) if is_frontend_tool: @@ -877,9 +921,20 @@ def flush_pending_msg(): ) # NOTE: interrupt is the caller's responsibility - # (e.g. worker.interrupt() from the platform layer) + # (e.g. worker.interrupt() from the platform layer). + # Check adapter.halted after the stream ends. + self._halted = True + self._halted_tool_call_id = current_tool_call_id halt_event_stream = True + + # Clear in-flight tool call state to prevent duplicate + # ToolCallEndEvent emission in the finally block + current_tool_call_id = None + current_tool_call_name = None + current_tool_display_name = None + accumulated_tool_json = "" + # Continue consuming remaining events for cleanup continue diff --git a/components/runners/ambient-runner/ambient_runner/bridges/claude/bridge.py b/components/runners/ambient-runner/ambient_runner/bridges/claude/bridge.py index dfd02f88a..e6e400a68 100644 --- a/components/runners/ambient-runner/ambient_runner/bridges/claude/bridge.py +++ b/components/runners/ambient-runner/ambient_runner/bridges/claude/bridge.py @@ -58,6 +58,8 @@ def __init__(self) -> None: self._stderr_lines: list[str] = [] # Preserved session IDs across adapter rebuilds (e.g. repo additions) self._saved_session_ids: dict[str, str] = {} + # Per-thread halt tracking to avoid race conditions on shared adapter + self._halted_by_thread: dict[str, bool] = {} # ------------------------------------------------------------------ # PlatformBridge interface @@ -134,6 +136,22 @@ async def run(self, input_data: RunAgentInput) -> AsyncIterator[BaseEvent]: self._session_manager._session_ids[thread_id] = worker.session_id self._session_manager._persist_session_ids() + # Capture halt state for this thread to avoid race conditions + # with concurrent runs modifying the shared adapter's halted flag + self._halted_by_thread[thread_id] = self._adapter.halted + + # If the adapter halted (frontend tool or built-in HITL tool like + # AskUserQuestion), interrupt the worker to prevent the SDK from + # auto-approving the tool call with a placeholder result. + if self._halted_by_thread.get(thread_id, False): + logger.info( + f"Adapter halted for thread={thread_id}, " + "interrupting worker to await user input" + ) + await worker.interrupt() + # Clear the halt flag for this thread + self._halted_by_thread.pop(thread_id, None) + self._first_run = False async def interrupt(self, thread_id: Optional[str] = None) -> None: @@ -179,6 +197,7 @@ def mark_dirty(self) -> None: self._ready = False self._first_run = True self._adapter = None + self._halted_by_thread.clear() if self._session_manager: # Preserve session IDs so --resume works after adapter rebuild. # Must be captured synchronously before the async shutdown task runs. diff --git a/components/runners/ambient-runner/ambient_runner/platform/prompts.py b/components/runners/ambient-runner/ambient_runner/platform/prompts.py index 85b1a6183..d30d01031 100644 --- a/components/runners/ambient-runner/ambient_runner/platform/prompts.py +++ b/components/runners/ambient-runner/ambient_runner/platform/prompts.py @@ -85,6 +85,15 @@ "Provide honest, calibrated scores with clear reasoning.\n\n" ) +HUMAN_INPUT_INSTRUCTIONS = ( + "## Human-in-the-Loop\n" + "When you need user input, a decision, or confirmation before proceeding, " + "you MUST use the AskUserQuestion tool. Do not ask questions in plain text " + "and wait for a response — the AskUserQuestion tool triggers platform " + "notifications and status indicators that help users know you need their " + "attention.\n\n" +) + RESTART_TOOL_DESCRIPTION = ( "Restart the Claude session to recover from issues, clear state, " "or get a fresh connection. Use this if you detect you're in a " @@ -206,6 +215,9 @@ def build_workspace_context_prompt( prompt += f"- **repos/{repo_name}/**\n" prompt += GIT_PUSH_STEPS.format(branch=push_branch) + # Human-in-the-loop instructions + prompt += HUMAN_INPUT_INSTRUCTIONS + # MCP integration setup instructions prompt += MCP_INTEGRATIONS_PROMPT