diff --git a/.gitignore b/.gitignore index 8df0aa7c2..bf02432cc 100644 --- a/.gitignore +++ b/.gitignore @@ -88,6 +88,7 @@ dmypy.json # Claude Code .claude/settings.local.json +.claude/worktrees/ # mkdocs /site diff --git a/Makefile b/Makefile index 56f540607..5db7c477c 100644 --- a/Makefile +++ b/Makefile @@ -734,9 +734,9 @@ kind-port-forward: check-kubectl check-local-context ## Port-forward kind servic @echo "" @echo "$(COLOR_YELLOW)Press Ctrl+C to stop$(COLOR_RESET)" @echo "" - @trap 'echo ""; echo "$(COLOR_GREEN)✓$(COLOR_RESET) Port forwarding stopped"; exit 0' INT; \ - (kubectl port-forward -n ambient-code svc/frontend-service $(KIND_FWD_FRONTEND_PORT):3000 >/dev/null 2>&1 &); \ - (kubectl port-forward -n ambient-code svc/backend-service $(KIND_FWD_BACKEND_PORT):8080 >/dev/null 2>&1 &); \ + @trap 'kill 0; echo ""; echo "$(COLOR_GREEN)✓$(COLOR_RESET) Port forwarding stopped"; exit 0' INT; \ + kubectl port-forward -n $(NAMESPACE) svc/frontend-service $(KIND_FWD_FRONTEND_PORT):3000 >/dev/null 2>&1 & \ + kubectl port-forward -n $(NAMESPACE) svc/backend-service $(KIND_FWD_BACKEND_PORT):8080 >/dev/null 2>&1 & \ wait dev-bootstrap: check-kubectl check-local-context ## Bootstrap developer workspace with API key and integrations diff --git a/components/backend/handlers/sessions.go b/components/backend/handlers/sessions.go index 29bf7f775..814d3e261 100644 --- a/components/backend/handlers/sessions.go +++ b/components/backend/handlers/sessions.go @@ -41,6 +41,10 @@ var ( GetGitHubToken func(context.Context, kubernetes.Interface, dynamic.Interface, string, string) (string, error) GetGitLabToken func(context.Context, kubernetes.Interface, string, string) (string, error) DeriveRepoFolderFromURL func(string) string + // DeriveAgentStatusFromEvents derives agentStatus from the persisted event log. + // Set by the websocket package at init to avoid circular imports. + // sessionID should be namespace-qualified (e.g., "namespace/sessionName") to avoid cross-project collisions. + DeriveAgentStatusFromEvents func(sessionID string) string // LEGACY: SendMessageToSession removed - AG-UI server uses HTTP/SSE instead of WebSocket ) @@ -361,6 +365,28 @@ func parseStatus(status map[string]interface{}) *types.AgenticSessionStatus { // V2 API Handlers - Multi-tenant session management +// enrichAgentStatus derives agentStatus from the persisted event log for +// Running sessions. This is the source of truth — it replaces the stale +// CR-cached value which was subject to goroutine race conditions. +func enrichAgentStatus(session *types.AgenticSession) { + if session.Status == nil || session.Status.Phase != "Running" { + return + } + if DeriveAgentStatusFromEvents == nil { + return + } + name, _ := session.Metadata["name"].(string) + namespace, _ := session.Metadata["namespace"].(string) + if name == "" || namespace == "" { + return + } + // Use namespace-qualified key to avoid cross-project collisions in the event store + sessionID := namespace + "/" + name + if derived := DeriveAgentStatusFromEvents(sessionID); derived != "" { + session.Status.AgentStatus = types.StringPtr(derived) + } +} + func ListSessions(c *gin.Context) { project := c.GetString("project") @@ -431,6 +457,11 @@ func ListSessions(c *gin.Context) { totalCount := len(sessions) paginatedSessions, hasMore, nextOffset := paginateSessions(sessions, params.Offset, params.Limit) + // Derive agentStatus from event log only for paginated sessions (performance optimization) + for i := range paginatedSessions { + enrichAgentStatus(&paginatedSessions[i]) + } + response := types.PaginatedResponse{ Items: paginatedSessions, TotalCount: totalCount, @@ -645,9 +676,9 @@ func CreateSession(c *gin.Context) { timeout = *req.Timeout } - // Generate unique name (timestamp-based) + // Generate unique name (millisecond timestamp for burst-creation safety) // Note: Runner will create branch as "ambient/{session-name}" - timestamp := time.Now().Unix() + timestamp := time.Now().UnixMilli() name := fmt.Sprintf("session-%d", timestamp) // Create the custom resource @@ -903,6 +934,9 @@ func GetSession(c *gin.Context) { session.Status = parseStatus(status) } + // Derive agentStatus from event log (source of truth) for running sessions + enrichAgentStatus(&session) + session.AutoBranch = ComputeAutoBranch(sessionName) c.JSON(http.StatusOK, session) diff --git a/components/backend/main.go b/components/backend/main.go index b2b7f6984..993e423cf 100644 --- a/components/backend/main.go +++ b/components/backend/main.go @@ -163,6 +163,7 @@ func main() { // Initialize websocket package websocket.StateBaseDir = server.StateBaseDir + handlers.DeriveAgentStatusFromEvents = websocket.DeriveAgentStatus // Normal server mode if err := server.Run(registerRoutes); err != nil { diff --git a/components/backend/types/agui.go b/components/backend/types/agui.go index 72041da83..24b7fda82 100644 --- a/components/backend/types/agui.go +++ b/components/backend/types/agui.go @@ -69,6 +69,13 @@ const ( EventTypeMeta = "META" ) +// Agent status values derived from the AG-UI event stream. +const ( + AgentStatusWorking = "working" + AgentStatusIdle = "idle" + AgentStatusWaitingInput = "waiting_input" +) + // AG-UI Message Roles // See: https://docs.ag-ui.com/concepts/messages const ( diff --git a/components/backend/types/session.go b/components/backend/types/session.go index 828a9630f..e34aacb42 100644 --- a/components/backend/types/session.go +++ b/components/backend/types/session.go @@ -42,6 +42,7 @@ type AgenticSessionStatus struct { StartTime *string `json:"startTime,omitempty"` CompletionTime *string `json:"completionTime,omitempty"` LastActivityTime *string `json:"lastActivityTime,omitempty"` + AgentStatus *string `json:"agentStatus,omitempty"` StoppedReason *string `json:"stoppedReason,omitempty"` ReconciledRepos []ReconciledRepo `json:"reconciledRepos,omitempty"` ReconciledWorkflow *ReconciledWorkflow `json:"reconciledWorkflow,omitempty"` diff --git a/components/backend/websocket/agui_proxy.go b/components/backend/websocket/agui_proxy.go index 18a8e211a..002934a8a 100644 --- a/components/backend/websocket/agui_proxy.go +++ b/components/backend/websocket/agui_proxy.go @@ -257,10 +257,13 @@ func HandleAGUIRunProxy(c *gin.Context) { log.Printf("AGUI Proxy: run=%s session=%s/%s msgs=%d", truncID(runID), projectName, sessionName, len(rawMessages)) + // Use namespace-qualified session ID to avoid cross-project collisions + namespacedSessionID := projectName + "/" + sessionName + sessionLastSeen.Store(sessionName, time.Now()) // Store project→session mapping for activity tracking in persistStreamedEvent - sessionProjectMap.Store(sessionName, projectName) + sessionProjectMap.Store(namespacedSessionID, projectName) // Resolve and cache the runner port for this session from the registry. cacheSessionPort(projectName, sessionName) @@ -297,7 +300,7 @@ func HandleAGUIRunProxy(c *gin.Context) { runnerURL := getRunnerEndpoint(projectName, sessionName) // Start background goroutine to proxy runner SSE → persist + broadcast - go proxyRunnerStream(runnerURL, bodyBytes, sessionName, runID, threadID) + go proxyRunnerStream(runnerURL, bodyBytes, sessionName, namespacedSessionID, runID, threadID) // Return metadata immediately — events arrive via GET /agui/events c.JSON(http.StatusOK, gin.H{ @@ -309,13 +312,14 @@ func HandleAGUIRunProxy(c *gin.Context) { // proxyRunnerStream connects to the runner's SSE endpoint, reads events, // persists them, and publishes them to the live broadcast pipe. Runs in // a background goroutine so the POST /agui/run handler can return immediately. -func proxyRunnerStream(runnerURL string, bodyBytes []byte, sessionName, runID, threadID string) { +// namespacedSessionID is the namespace-qualified session ID (e.g., "namespace/sessionName") for event persistence. +func proxyRunnerStream(runnerURL string, bodyBytes []byte, sessionName, namespacedSessionID, runID, threadID string) { log.Printf("AGUI Proxy: connecting to runner at %s", runnerURL) resp, err := connectToRunner(runnerURL, bodyBytes) if err != nil { log.Printf("AGUI Proxy: runner unavailable for %s: %v", sessionName, err) // Publish error events so GET /agui/events subscribers see the failure - publishAndPersistErrorEvents(sessionName, runID, threadID, "Runner is not available") + publishAndPersistErrorEvents(sessionName, namespacedSessionID, runID, threadID, "Runner is not available") return } defer resp.Body.Close() @@ -323,7 +327,7 @@ func proxyRunnerStream(runnerURL string, bodyBytes []byte, sessionName, runID, t if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) log.Printf("AGUI Proxy: runner returned %d: %s", resp.StatusCode, string(body)) - publishAndPersistErrorEvents(sessionName, runID, threadID, fmt.Sprintf("Runner error: HTTP %d", resp.StatusCode)) + publishAndPersistErrorEvents(sessionName, namespacedSessionID, runID, threadID, fmt.Sprintf("Runner error: HTTP %d", resp.StatusCode)) return } @@ -343,7 +347,7 @@ func proxyRunnerStream(runnerURL string, bodyBytes []byte, sessionName, runID, t // Persist every data event to JSONL if strings.HasPrefix(trimmed, "data: ") { jsonData := strings.TrimPrefix(trimmed, "data: ") - persistStreamedEvent(sessionName, runID, threadID, jsonData) + persistStreamedEvent(namespacedSessionID, runID, threadID, jsonData) } // Publish raw SSE line to all GET /agui/events subscribers @@ -356,14 +360,15 @@ func proxyRunnerStream(runnerURL string, bodyBytes []byte, sessionName, runID, t // publishAndPersistErrorEvents generates RUN_STARTED + RUN_ERROR events, // persists them, and publishes to the live broadcast so subscribers get // notified of runner failures. -func publishAndPersistErrorEvents(sessionName, runID, threadID, message string) { +// sessionName is used for broadcasting; namespacedSessionID is used for persistence. +func publishAndPersistErrorEvents(sessionName, namespacedSessionID, runID, threadID, message string) { // RUN_STARTED startEvt := map[string]interface{}{ "type": "RUN_STARTED", "threadId": threadID, "runId": runID, } - persistEvent(sessionName, startEvt) + persistEvent(namespacedSessionID, startEvt) startData, _ := json.Marshal(startEvt) publishLine(sessionName, fmt.Sprintf("data: %s\n\n", startData)) @@ -374,7 +379,7 @@ func publishAndPersistErrorEvents(sessionName, runID, threadID, message string) "threadId": threadID, "runId": runID, } - persistEvent(sessionName, errEvt) + persistEvent(namespacedSessionID, errEvt) errData, _ := json.Marshal(errEvt) publishLine(sessionName, fmt.Sprintf("data: %s\n\n", errData)) } @@ -436,15 +441,19 @@ func persistStreamedEvent(sessionID, runID, threadID, jsonData string) { persistEvent(sessionID, event) - // Update lastActivityTime on CR for activity events (debounced). - // Extract event type to check; projectName is derived from the + // Extract event type; projectName is derived from the // sessionID-to-project mapping populated by HandleAGUIRunProxy. eventType, _ := event["type"].(string) + + // Update lastActivityTime on CR for activity events (debounced). if isActivityEvent(eventType) { if projectName, ok := sessionProjectMap.Load(sessionID); ok { updateLastActivityTime(projectName.(string), sessionID, eventType == types.EventTypeRunStarted) } } + + // agentStatus is derived at query time from the event log (DeriveAgentStatus). + // No CR updates needed here — the persisted events ARE the source of truth. } // ─── POST /agui/interrupt ──────────────────────────────────────────── @@ -945,3 +954,16 @@ func updateLastActivityTime(projectName, sessionName string, immediate bool) { } }() } + +// isAskUserQuestionToolCall checks if a tool call name is the AskUserQuestion HITL tool. +// Uses case-insensitive comparison after stripping non-alpha characters, +// matching the frontend pattern in use-agent-status.ts. +func isAskUserQuestionToolCall(name string) bool { + var clean strings.Builder + for _, r := range strings.ToLower(name) { + if r >= 'a' && r <= 'z' { + clean.WriteRune(r) + } + } + return clean.String() == "askuserquestion" +} diff --git a/components/backend/websocket/agui_store.go b/components/backend/websocket/agui_store.go index a6ae9deec..2d03c8c2c 100644 --- a/components/backend/websocket/agui_store.go +++ b/components/backend/websocket/agui_store.go @@ -11,6 +11,7 @@ package websocket import ( "ambient-code-backend/types" + "bytes" "encoding/json" "fmt" "log" @@ -194,6 +195,110 @@ func loadEvents(sessionID string) []map[string]interface{} { return events } +// DeriveAgentStatus reads a session's event log and returns the agent +// status derived from the last significant events. +// +// sessionID should be namespace-qualified (e.g., "namespace/sessionName") to avoid cross-project collisions. +// Returns "" if the status cannot be determined (no events, file missing, etc.). +func DeriveAgentStatus(sessionID string) string { + // sessionID is now namespace-qualified, e.g., "default/session-123" + path := fmt.Sprintf("%s/sessions/%s/agui-events.jsonl", StateBaseDir, sessionID) + + // Read only the tail of the file to avoid loading entire event log into memory. + // 64KB is sufficient for recent lifecycle events (scanning backwards). + const maxTailBytes = 64 * 1024 + + file, err := os.Open(path) + if err != nil { + return "" + } + defer file.Close() + + stat, err := file.Stat() + if err != nil { + return "" + } + + fileSize := stat.Size() + var data []byte + + if fileSize <= maxTailBytes { + // File is small, read it all + data, err = os.ReadFile(path) + if err != nil { + return "" + } + } else { + // File is large, seek to tail and read last N bytes + offset := fileSize - maxTailBytes + _, err = file.Seek(offset, 0) + if err != nil { + return "" + } + + data = make([]byte, maxTailBytes) + n, err := file.Read(data) + if err != nil { + return "" + } + data = data[:n] + + // Skip partial first line (we seeked into the middle of a line) + if idx := bytes.IndexByte(data, '\n'); idx >= 0 { + data = data[idx+1:] + } + } + + lines := splitLines(data) + + // Scan backwards. We only care about lifecycle and AskUserQuestion events. + // RUN_STARTED → "working" + // RUN_FINISHED / RUN_ERROR → "idle", unless same run had AskUserQuestion + // TOOL_CALL_START (AskUserQuestion) → "waiting_input" + var runEndRunID string // set when we hit RUN_FINISHED/RUN_ERROR and need to look deeper + for i := len(lines) - 1; i >= 0; i-- { + if len(lines[i]) == 0 { + continue + } + var evt map[string]interface{} + if err := json.Unmarshal(lines[i], &evt); err != nil { + continue + } + evtType, _ := evt["type"].(string) + + switch evtType { + case types.EventTypeRunStarted: + if runEndRunID != "" { + // We were scanning for an AskUserQuestion but hit RUN_STARTED first → idle + return types.AgentStatusIdle + } + return types.AgentStatusWorking + + case types.EventTypeRunFinished, types.EventTypeRunError: + if runEndRunID == "" { + // First run-end seen; scan deeper within this run for AskUserQuestion + runEndRunID, _ = evt["runId"].(string) + } + + case types.EventTypeToolCallStart: + if runEndRunID != "" { + // Only relevant if we're scanning within the ended run + if evtRunID, _ := evt["runId"].(string); evtRunID != "" && evtRunID != runEndRunID { + return types.AgentStatusIdle + } + } + if toolName, _ := evt["toolCallName"].(string); isAskUserQuestionToolCall(toolName) { + return types.AgentStatusWaitingInput + } + } + } + + if runEndRunID != "" { + return types.AgentStatusIdle + } + return "" +} + // ─── Compaction ────────────────────────────────────────────────────── // // Go port of @ag-ui/client compactEvents. Concatenates streaming deltas diff --git a/components/backend/websocket/agui_store_test.go b/components/backend/websocket/agui_store_test.go new file mode 100644 index 000000000..f9fbdd4d5 --- /dev/null +++ b/components/backend/websocket/agui_store_test.go @@ -0,0 +1,228 @@ +package websocket + +import ( + "ambient-code-backend/types" + "encoding/json" + "os" + "path/filepath" + "testing" +) + +func TestDeriveAgentStatus(t *testing.T) { + // Create a temporary directory for test files + tmpDir, err := os.MkdirTemp("", "agui-store-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + // Set the StateBaseDir to our temp directory for testing + origStateBaseDir := StateBaseDir + StateBaseDir = tmpDir + defer func() { StateBaseDir = origStateBaseDir }() + + t.Run("empty file returns empty status", func(t *testing.T) { + sessionID := "test-session-empty" + sessionsDir := filepath.Join(tmpDir, "sessions", sessionID) + if err := os.MkdirAll(sessionsDir, 0755); err != nil { + t.Fatalf("Failed to create sessions dir: %v", err) + } + + // Create empty events file + eventsFile := filepath.Join(sessionsDir, "agui-events.jsonl") + if err := os.WriteFile(eventsFile, []byte(""), 0644); err != nil { + t.Fatalf("Failed to write events file: %v", err) + } + + status := DeriveAgentStatus(sessionID) + if status != "" { + t.Errorf("Expected empty status for empty file, got %q", status) + } + }) + + t.Run("RUN_STARTED only returns working", func(t *testing.T) { + sessionID := "test-session-run-started" + sessionsDir := filepath.Join(tmpDir, "sessions", sessionID) + if err := os.MkdirAll(sessionsDir, 0755); err != nil { + t.Fatalf("Failed to create sessions dir: %v", err) + } + + // Create events file with RUN_STARTED event + event := map[string]interface{}{ + "type": types.EventTypeRunStarted, + "runId": "run-123", + } + eventData, _ := json.Marshal(event) + eventsFile := filepath.Join(sessionsDir, "agui-events.jsonl") + if err := os.WriteFile(eventsFile, append(eventData, '\n'), 0644); err != nil { + t.Fatalf("Failed to write events file: %v", err) + } + + status := DeriveAgentStatus(sessionID) + if status != types.AgentStatusWorking { + t.Errorf("Expected %q for RUN_STARTED, got %q", types.AgentStatusWorking, status) + } + }) + + t.Run("RUN_FINISHED returns idle", func(t *testing.T) { + sessionID := "test-session-run-finished" + sessionsDir := filepath.Join(tmpDir, "sessions", sessionID) + if err := os.MkdirAll(sessionsDir, 0755); err != nil { + t.Fatalf("Failed to create sessions dir: %v", err) + } + + // Create events file with RUN_STARTED then RUN_FINISHED + events := []map[string]interface{}{ + {"type": types.EventTypeRunStarted, "runId": "run-123"}, + {"type": types.EventTypeRunFinished, "runId": "run-123"}, + } + eventsFile := filepath.Join(sessionsDir, "agui-events.jsonl") + f, err := os.Create(eventsFile) + if err != nil { + t.Fatalf("Failed to create events file: %v", err) + } + for _, evt := range events { + data, _ := json.Marshal(evt) + f.Write(append(data, '\n')) + } + f.Close() + + status := DeriveAgentStatus(sessionID) + if status != types.AgentStatusIdle { + t.Errorf("Expected %q for RUN_FINISHED, got %q", types.AgentStatusIdle, status) + } + }) + + t.Run("RUN_FINISHED with same-run AskUserQuestion returns waiting_input", func(t *testing.T) { + sessionID := "test-session-waiting-input" + sessionsDir := filepath.Join(tmpDir, "sessions", sessionID) + if err := os.MkdirAll(sessionsDir, 0755); err != nil { + t.Fatalf("Failed to create sessions dir: %v", err) + } + + // Create events file with RUN_STARTED, AskUserQuestion TOOL_CALL_START, then RUN_FINISHED + // Scanning backwards: RUN_FINISHED → looks deeper → finds AskUserQuestion with same runId + events := []map[string]interface{}{ + {"type": types.EventTypeRunStarted, "runId": "run-123"}, + {"type": types.EventTypeToolCallStart, "runId": "run-123", "toolCallName": "AskUserQuestion"}, + {"type": types.EventTypeRunFinished, "runId": "run-123"}, + } + eventsFile := filepath.Join(sessionsDir, "agui-events.jsonl") + f, err := os.Create(eventsFile) + if err != nil { + t.Fatalf("Failed to create events file: %v", err) + } + for _, evt := range events { + data, _ := json.Marshal(evt) + f.Write(append(data, '\n')) + } + f.Close() + + status := DeriveAgentStatus(sessionID) + if status != types.AgentStatusWaitingInput { + t.Errorf("Expected %q for same-run AskUserQuestion, got %q", types.AgentStatusWaitingInput, status) + } + }) + + t.Run("RUN_FINISHED with different-run AskUserQuestion returns idle", func(t *testing.T) { + sessionID := "test-session-different-run" + sessionsDir := filepath.Join(tmpDir, "sessions", sessionID) + if err := os.MkdirAll(sessionsDir, 0755); err != nil { + t.Fatalf("Failed to create sessions dir: %v", err) + } + + // Create events file with old AskUserQuestion from run-456, then run-123 finishes + // Scanning backwards: RUN_FINISHED(run-123) → looks deeper → finds AskUserQuestion(run-456) → different run → idle + events := []map[string]interface{}{ + {"type": types.EventTypeRunStarted, "runId": "run-456"}, + {"type": types.EventTypeToolCallStart, "runId": "run-456", "toolCallName": "AskUserQuestion"}, + {"type": types.EventTypeRunFinished, "runId": "run-456"}, + {"type": types.EventTypeRunStarted, "runId": "run-123"}, + {"type": types.EventTypeRunFinished, "runId": "run-123"}, + } + eventsFile := filepath.Join(sessionsDir, "agui-events.jsonl") + f, err := os.Create(eventsFile) + if err != nil { + t.Fatalf("Failed to create events file: %v", err) + } + for _, evt := range events { + data, _ := json.Marshal(evt) + f.Write(append(data, '\n')) + } + f.Close() + + status := DeriveAgentStatus(sessionID) + if status != types.AgentStatusIdle { + t.Errorf("Expected %q for different-run AskUserQuestion, got %q", types.AgentStatusIdle, status) + } + }) + + t.Run("RUN_ERROR returns idle", func(t *testing.T) { + sessionID := "test-session-run-error" + sessionsDir := filepath.Join(tmpDir, "sessions", sessionID) + if err := os.MkdirAll(sessionsDir, 0755); err != nil { + t.Fatalf("Failed to create sessions dir: %v", err) + } + + // Create events file with RUN_STARTED then RUN_ERROR + events := []map[string]interface{}{ + {"type": types.EventTypeRunStarted, "runId": "run-123"}, + {"type": types.EventTypeRunError, "runId": "run-123"}, + } + eventsFile := filepath.Join(sessionsDir, "agui-events.jsonl") + f, err := os.Create(eventsFile) + if err != nil { + t.Fatalf("Failed to create events file: %v", err) + } + for _, evt := range events { + data, _ := json.Marshal(evt) + f.Write(append(data, '\n')) + } + f.Close() + + status := DeriveAgentStatus(sessionID) + if status != types.AgentStatusIdle { + t.Errorf("Expected %q for RUN_ERROR, got %q", types.AgentStatusIdle, status) + } + }) + + t.Run("case-insensitive AskUserQuestion detection", func(t *testing.T) { + sessionID := "test-session-case-insensitive" + sessionsDir := filepath.Join(tmpDir, "sessions", sessionID) + if err := os.MkdirAll(sessionsDir, 0755); err != nil { + t.Fatalf("Failed to create sessions dir: %v", err) + } + + // Test various casings of AskUserQuestion + testCases := []string{"askuserquestion", "ASKUSERQUESTION", "AskUserQuestion", "AsKuSeRqUeStIoN"} + for _, toolName := range testCases { + events := []map[string]interface{}{ + {"type": types.EventTypeRunStarted, "runId": "run-123"}, + {"type": types.EventTypeToolCallStart, "runId": "run-123", "toolCallName": toolName}, + {"type": types.EventTypeRunFinished, "runId": "run-123"}, + } + eventsFile := filepath.Join(sessionsDir, "agui-events.jsonl") + f, err := os.Create(eventsFile) + if err != nil { + t.Fatalf("Failed to create events file: %v", err) + } + for _, evt := range events { + data, _ := json.Marshal(evt) + f.Write(append(data, '\n')) + } + f.Close() + + status := DeriveAgentStatus(sessionID) + if status != types.AgentStatusWaitingInput { + t.Errorf("Expected %q for toolName %q, got %q", types.AgentStatusWaitingInput, toolName, status) + } + } + }) + + t.Run("non-existent session returns empty status", func(t *testing.T) { + status := DeriveAgentStatus("non-existent-session") + if status != "" { + t.Errorf("Expected empty status for non-existent session, got %q", status) + } + }) +} diff --git a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx index b0f4dad0e..ffbe9ac06 100644 --- a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx +++ b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx @@ -66,7 +66,9 @@ import { } from "@/components/ui/breadcrumb"; import Link from "next/link"; import { SessionHeader } from "./session-header"; -import { getPhaseColor } from "@/utils/session-helpers"; +import { SessionStatusDot } from "@/components/session-status-dot"; +import { AgentStatusIndicator } from "@/components/agent-status-indicator"; +import { useAgentStatus } from "@/hooks/use-agent-status"; // Extracted components import { AddContextModal } from "./components/modals/add-context-modal"; @@ -775,6 +777,13 @@ export default function ProjectSessionDetailPage({ handleWorkflowChange(workflowId); }; + // Derive agent-level status from session data and messages + const agentStatus = useAgentStatus( + session?.status?.phase || "Pending", + isRunActive, + aguiStream.state.messages, + ); + // Phase 1: convert committed messages + streaming tool cards into display format. // Does NOT depend on currentMessage / currentReasoning so it skips the full // O(n) traversal during text-streaming deltas (the most frequent event type). @@ -923,6 +932,12 @@ export default function ProjectSessionDetailPage({ // Handle text content by role if (msg.role === "user") { + // Hide AskUserQuestion response messages from the chat + const msgMeta = msg.metadata as Record | undefined; + if (msgMeta?.type === "ask_user_question_response") { + continue; + } + result.push({ type: "user_message", id: msg.id, // Preserve message ID for feedback association @@ -1383,6 +1398,18 @@ export default function ProjectSessionDetailPage({ } }; + // Send an AskUserQuestion response (hidden from chat, properly formatted) + const sendToolAnswer = async (formattedAnswer: string) => { + try { + await aguiSendMessage(formattedAnswer, { + type: "ask_user_question_response", + }); + } catch (err) { + toast.error(err instanceof Error ? err.message : "Failed to send answer"); + throw err; + } + }; + const handleCommandClick = async (slashCommand: string) => { try { await aguiSendMessage(slashCommand); @@ -1479,13 +1506,8 @@ export default function ProjectSessionDetailPage({ {session.spec.displayName || session.metadata.name} - - {session.status?.phase || "Pending"} - + + {agentName && ( {agentName} / {session.spec.llmSettings.model} @@ -1519,13 +1541,8 @@ export default function ProjectSessionDetailPage({ {session.spec.displayName || session.metadata.name} - - {session.status?.phase || "Pending"} - + + @@ -2627,6 +2644,7 @@ export default function ProjectSessionDetailPage({ chatInput={chatInput} setChatInput={setChatInput} onSendChat={() => Promise.resolve(sendChat())} + onSendToolAnswer={sendToolAnswer} onInterrupt={aguiInterrupt} onGoToResults={() => {}} onContinue={handleContinue} @@ -2705,6 +2723,7 @@ export default function ProjectSessionDetailPage({ chatInput={chatInput} setChatInput={setChatInput} onSendChat={() => Promise.resolve(sendChat())} + onSendToolAnswer={sendToolAnswer} onInterrupt={aguiInterrupt} onGoToResults={() => {}} onContinue={handleContinue} diff --git a/components/frontend/src/components/agent-status-indicator.tsx b/components/frontend/src/components/agent-status-indicator.tsx new file mode 100644 index 000000000..d5278b5d7 --- /dev/null +++ b/components/frontend/src/components/agent-status-indicator.tsx @@ -0,0 +1,109 @@ +"use client"; + +import { cn } from "@/lib/utils"; +import { Badge } from "@/components/ui/badge"; +import { + Loader2, + CheckCircle2, + XCircle, + Circle, + HelpCircle, +} from "lucide-react"; +import type { AgentStatus } from "@/types/agentic-session"; + +type AgentStatusIndicatorProps = { + status: AgentStatus; + compact?: boolean; + className?: string; +}; + +export function AgentStatusIndicator({ + status, + compact = false, + className, +}: AgentStatusIndicatorProps) { + switch (status) { + case "working": + return ( +
+ + {!compact && ( + + Working + + )} +
+ ); + + case "waiting_input": + return ( + + + {compact ? "Input" : "Needs Input"} + + ); + + case "completed": + return ( +
+ + {!compact && ( + + Completed + + )} +
+ ); + + case "failed": + return ( +
+ + {!compact && ( + + Failed + + )} +
+ ); + + case "idle": + return ( +
+ + {!compact && ( + Idle + )} +
+ ); + + default: + return null; + } +} diff --git a/components/frontend/src/components/session-status-dot.tsx b/components/frontend/src/components/session-status-dot.tsx new file mode 100644 index 000000000..65488075c --- /dev/null +++ b/components/frontend/src/components/session-status-dot.tsx @@ -0,0 +1,58 @@ +"use client"; + +import { cn } from "@/lib/utils"; +import { + Tooltip, + TooltipContent, + TooltipProvider, + TooltipTrigger, +} from "@/components/ui/tooltip"; +import type { AgenticSessionPhase } from "@/types/agentic-session"; + +type SessionStatusDotProps = { + phase: AgenticSessionPhase | string; + className?: string; +}; + +const DOT_COLORS: Record = { + Running: "bg-blue-500", + Completed: "bg-gray-400", + Stopped: "bg-gray-400", + Failed: "bg-red-500", + Pending: "bg-orange-400", + Creating: "bg-orange-400", + Stopping: "bg-orange-400", +}; + +const DOT_ANIMATIONS: Record = { + Creating: "animate-pulse", + Stopping: "animate-pulse", +}; + +export function SessionStatusDot({ phase, className }: SessionStatusDotProps) { + const color = DOT_COLORS[phase] || "bg-gray-400"; + const animation = DOT_ANIMATIONS[phase] || ""; + + return ( + + + + + ); + })} + + )} + + {/* Question text */} +

{currentQuestion.question}

+ + {/* Options */} + {renderQuestionOptions(currentQuestion)} + + {/* Footer */} + {!disabled && ( +
+
+ {isMultiQuestion + ? `${questions.filter(isQuestionAnswered).length}/${questions.length} answered` + : selections[currentQuestion.question] + ? "" + : "Select an option"} +
+ +
+ {isMultiQuestion && activeTab < questions.length - 1 && ( + + )} + + {(!isMultiQuestion || activeTab === questions.length - 1) && ( + + )} +
+
+ )} + + + + + ); +}; + +AskUserQuestionMessage.displayName = "AskUserQuestionMessage"; diff --git a/components/frontend/src/components/ui/stream-message.tsx b/components/frontend/src/components/ui/stream-message.tsx index bcd38db13..b7df86ed2 100644 --- a/components/frontend/src/components/ui/stream-message.tsx +++ b/components/frontend/src/components/ui/stream-message.tsx @@ -4,6 +4,7 @@ import React from "react"; import { MessageObject, ToolUseMessages, HierarchicalToolMessage } from "@/types/agentic-session"; import { LoadingDots, Message } from "@/components/ui/message"; import { ToolMessage } from "@/components/ui/tool-message"; +import { AskUserQuestionMessage } from "@/components/session/ask-user-question"; import { ThinkingMessage } from "@/components/ui/thinking-message"; import { SystemMessage } from "@/components/ui/system-message"; import { Button } from "@/components/ui/button"; @@ -12,11 +13,17 @@ import { FeedbackButtons } from "@/components/feedback"; export type StreamMessageProps = { message: (MessageObject | ToolUseMessages | HierarchicalToolMessage) & { streaming?: boolean }; onGoToResults?: () => void; + onSubmitAnswer?: (formattedAnswer: string) => Promise; plainCard?: boolean; isNewest?: boolean; agentName?: string; }; +function isAskUserQuestionTool(name: string): boolean { + const normalized = name.toLowerCase().replace(/[^a-z]/g, ""); + return normalized === "askuserquestion"; +} + const getRandomAgentMessage = () => { const messages = [ "The agents are working together on your request...", @@ -33,11 +40,24 @@ const getRandomAgentMessage = () => { return messages[Math.floor(Math.random() * messages.length)]; }; -export const StreamMessage: React.FC = ({ message, onGoToResults, plainCard=false, isNewest=false, agentName }) => { +export const StreamMessage: React.FC = ({ message, onGoToResults, onSubmitAnswer, plainCard=false, isNewest=false, agentName }) => { const isToolUsePair = (m: MessageObject | ToolUseMessages | HierarchicalToolMessage): m is ToolUseMessages | HierarchicalToolMessage => m != null && typeof m === "object" && "toolUseBlock" in m && "resultBlock" in m; if (isToolUsePair(message)) { + // Render AskUserQuestion with a custom interactive component + if (isAskUserQuestionTool(message.toolUseBlock.name)) { + return ( + + ); + } + // Check if this is a hierarchical message with children const hierarchical = message as HierarchicalToolMessage; return ( diff --git a/components/frontend/src/components/ui/tool-message.tsx b/components/frontend/src/components/ui/tool-message.tsx index 8136ddbae..512fd259b 100644 --- a/components/frontend/src/components/ui/tool-message.tsx +++ b/components/frontend/src/components/ui/tool-message.tsx @@ -308,6 +308,17 @@ const extractTextFromResultContent = (content: unknown): string => { const generateToolSummary = (toolName: string, input?: Record): string => { if (!input || Object.keys(input).length === 0) return formatToolName(toolName); + // AskUserQuestion - show first question text + if (toolName.toLowerCase().replace(/[^a-z]/g, "") === "askuserquestion") { + const questions = input.questions as Array<{ question: string }> | undefined; + if (questions?.length) { + const suffix = questions.length > 1 ? ` (+${questions.length - 1} more)` : ""; + return `Asking: "${questions[0].question}"${suffix}`; + } + return "Asking a question"; + } + + // WebSearch - show query if (toolName.toLowerCase().includes("websearch") || toolName.toLowerCase().includes("web_search")) { const query = input.query as string | undefined; diff --git a/components/frontend/src/components/workspace-sections/sessions-section.tsx b/components/frontend/src/components/workspace-sections/sessions-section.tsx index 2ffaa78f5..4a92fd777 100644 --- a/components/frontend/src/components/workspace-sections/sessions-section.tsx +++ b/components/frontend/src/components/workspace-sections/sessions-section.tsx @@ -22,7 +22,9 @@ import { } from '@/components/ui/pagination'; import { getPageNumbers } from '@/lib/pagination'; import { EmptyState } from '@/components/empty-state'; -import { SessionPhaseBadge } from '@/components/status-badge'; +import { SessionStatusDot } from '@/components/session-status-dot'; +import { AgentStatusIndicator } from '@/components/agent-status-indicator'; +import { deriveAgentStatusFromPhase } from '@/hooks/use-agent-status'; import { CreateSessionDialog } from '@/components/create-session-dialog'; import { EditSessionNameDialog } from '@/components/edit-session-name-dialog'; @@ -259,6 +261,7 @@ export function SessionsSection({ projectName }: SessionsSectionProps) { + Name Status Model @@ -277,6 +280,9 @@ export function SessionsSection({ projectName }: SessionsSectionProps) { return ( + + + @@ -298,7 +304,10 @@ export function SessionsSection({ projectName }: SessionsSectionProps) {

{session.spec.displayName || session.metadata.name}

- + {session.spec.displayName && (

{session.metadata.name}

@@ -326,7 +335,9 @@ export function SessionsSection({ projectName }: SessionsSectionProps) {
- +
diff --git a/components/frontend/src/hooks/agui/types.ts b/components/frontend/src/hooks/agui/types.ts index 3622ead09..ad3bd458f 100644 --- a/components/frontend/src/hooks/agui/types.ts +++ b/components/frontend/src/hooks/agui/types.ts @@ -25,7 +25,7 @@ export type UseAGUIStreamReturn = { state: AGUIClientState connect: (runId?: string) => void disconnect: () => void - sendMessage: (content: string) => Promise + sendMessage: (content: string, metadata?: Record) => Promise interrupt: () => Promise isConnected: boolean isStreaming: boolean diff --git a/components/frontend/src/hooks/use-agent-status.ts b/components/frontend/src/hooks/use-agent-status.ts new file mode 100644 index 000000000..b6fff8912 --- /dev/null +++ b/components/frontend/src/hooks/use-agent-status.ts @@ -0,0 +1,83 @@ +import { useMemo } from "react"; +import type { + AgenticSessionPhase, + AgentStatus, +} from "@/types/agentic-session"; +import type { PlatformMessage } from "@/types/agui"; + +function isAskUserQuestionTool(name: string): boolean { + const normalized = name.toLowerCase().replace(/[^a-z]/g, ""); + return normalized === "askuserquestion"; +} + +/** + * Derive agent status from session data and the raw AG-UI message stream. + * + * For the session detail page where the full message stream is available, + * this provides accurate status including `waiting_input` detection. + */ +export function useAgentStatus( + phase: AgenticSessionPhase | string, + isRunActive: boolean, + messages: PlatformMessage[], +): AgentStatus { + return useMemo(() => { + // Terminal states from session phase + if (phase === "Completed") return "completed"; + if (phase === "Failed") return "failed"; + if (phase === "Stopped") return "idle"; + + // Non-running phases + if (phase !== "Running") return "idle"; + + // Scan backwards for the last tool call to check for unanswered AskUserQuestion. + // Raw AG-UI messages store tool calls in msg.toolCalls[] (PlatformToolCall[]). + for (let i = messages.length - 1; i >= 0; i--) { + const msg = messages[i]; + if (!msg.toolCalls || msg.toolCalls.length === 0) continue; + + // Check the last tool call on this message + const lastTc = msg.toolCalls[msg.toolCalls.length - 1]; + if (lastTc.function?.name && isAskUserQuestionTool(lastTc.function.name)) { + const hasResult = + lastTc.result !== undefined && + lastTc.result !== null && + lastTc.result !== ""; + if (!hasResult) { + return "waiting_input"; + } + } + + // Only check the most recent message with tool calls + break; + } + + // Active processing + if (isRunActive) return "working"; + + // Running but idle between turns + return "idle"; + }, [phase, isRunActive, messages]); +} + +/** + * Derive a simplified agent status from session phase alone. + * + * Used in the session list where per-session message streams are not available. + */ +export function deriveAgentStatusFromPhase( + phase: AgenticSessionPhase | string, +): AgentStatus { + switch (phase) { + case "Running": + return "working"; + case "Completed": + return "completed"; + case "Failed": + return "failed"; + case "Stopped": + return "idle"; + default: + return "idle"; + } +} diff --git a/components/frontend/src/hooks/use-agui-stream.ts b/components/frontend/src/hooks/use-agui-stream.ts index 1f1a08c8d..01f18508e 100644 --- a/components/frontend/src/hooks/use-agui-stream.ts +++ b/components/frontend/src/hooks/use-agui-stream.ts @@ -223,7 +223,7 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur // Send a message to start/continue the conversation // AG-UI server pattern: POST returns SSE stream directly const sendMessage = useCallback( - async (content: string) => { + async (content: string, metadata?: Record) => { // Send to backend via run endpoint - this returns an SSE stream const runUrl = `/api/projects/${encodeURIComponent(projectName)}/agentic-sessions/${encodeURIComponent(sessionName)}/agui/run` @@ -231,6 +231,7 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur id: crypto.randomUUID(), role: 'user' as const, content, + ...(metadata ? { metadata } : {}), } // Add user message to state immediately for instant UI feedback. diff --git a/components/frontend/src/services/queries/use-sessions.ts b/components/frontend/src/services/queries/use-sessions.ts index e1084b02d..026a2cc00 100644 --- a/components/frontend/src/services/queries/use-sessions.ts +++ b/components/frontend/src/services/queries/use-sessions.ts @@ -40,6 +40,32 @@ export function useSessionsPaginated(projectName: string, params: PaginationPara queryFn: () => sessionsApi.listSessionsPaginated(projectName, params), enabled: !!projectName, placeholderData: keepPreviousData, // Keep previous data while fetching new page + // Smart polling: tier interval based on the most active session in the list + refetchInterval: (query) => { + const data = query.state.data as { items?: AgenticSession[] } | undefined; + const items = data?.items; + if (!items?.length) return false; + + // Tier 1: Any session transitioning phases → poll aggressively (2s) + const hasTransitioning = items.some((s) => { + const phase = s.status?.phase; + return phase === 'Pending' || phase === 'Creating' || phase === 'Stopping'; + }); + if (hasTransitioning) return 2000; + + // Tier 2: Any session with agent actively working → moderate (5s) + const hasWorking = items.some((s) => { + return s.status?.phase === 'Running' && (!s.status?.agentStatus || s.status?.agentStatus === 'working'); + }); + if (hasWorking) return 5000; + + // Tier 3: Any session running but agent idle/waiting → slow (15s) + const hasRunning = items.some((s) => s.status?.phase === 'Running'); + if (hasRunning) return 15000; + + // Tier 4: All sessions terminal → no polling + return false; + }, }); } diff --git a/components/frontend/src/types/agentic-session.ts b/components/frontend/src/types/agentic-session.ts index 813684e91..06f7cb5e0 100644 --- a/components/frontend/src/types/agentic-session.ts +++ b/components/frontend/src/types/agentic-session.ts @@ -1,5 +1,34 @@ export type AgenticSessionPhase = "Pending" | "Creating" | "Running" | "Stopping" | "Stopped" | "Completed" | "Failed"; +// Agent status (derived from message stream, distinct from session phase) +export type AgentStatus = + | "working" // Actively processing + | "waiting_input" // AskUserQuestion pending, needs human response + | "completed" // Task finished successfully + | "failed" // Task errored + | "idle"; // Session running, agent between turns + +// Subset of AgentStatus that can be persisted in the CR status field +// (completed/failed are derived at query time from phase, not stored) +export type StoredAgentStatus = "working" | "idle" | "waiting_input"; + +// AskUserQuestion tool types (Claude Agent SDK built-in) +export type AskUserQuestionOption = { + label: string; + description?: string; +}; + +export type AskUserQuestionItem = { + question: string; + header?: string; + options: AskUserQuestionOption[]; + multiSelect?: boolean; +}; + +export type AskUserQuestionInput = { + questions: AskUserQuestionItem[]; +}; + export type LLMSettings = { model: string; temperature: number; @@ -162,6 +191,7 @@ export type AgenticSessionStatus = { startTime?: string; completionTime?: string; lastActivityTime?: string; + agentStatus?: StoredAgentStatus; stoppedReason?: "user" | "inactivity"; reconciledRepos?: ReconciledRepo[]; reconciledWorkflow?: ReconciledWorkflow; diff --git a/components/frontend/src/types/api/sessions.ts b/components/frontend/src/types/api/sessions.ts index 41913eacd..2b3302bc9 100644 --- a/components/frontend/src/types/api/sessions.ts +++ b/components/frontend/src/types/api/sessions.ts @@ -29,6 +29,10 @@ export type AgenticSessionPhase = | 'Completed' | 'Failed'; +// Subset of agent status values that can be persisted in the CR status field +// (completed/failed are derived at query time from phase, not stored) +export type StoredAgentStatus = "working" | "idle" | "waiting_input"; + export type LLMSettings = { model: string; temperature: number; @@ -91,6 +95,7 @@ export type AgenticSessionStatus = { startTime?: string; completionTime?: string; lastActivityTime?: string; + agentStatus?: StoredAgentStatus; stoppedReason?: "user" | "inactivity"; jobName?: string; runnerPodName?: string; diff --git a/components/package-lock.json b/components/package-lock.json deleted file mode 100644 index 9f787ae19..000000000 --- a/components/package-lock.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "name": "components", - "lockfileVersion": 3, - "requires": true, - "packages": {} -} diff --git a/components/runners/ambient-runner/ag_ui_claude_sdk/adapter.py b/components/runners/ambient-runner/ag_ui_claude_sdk/adapter.py index 127729a52..538c5567f 100644 --- a/components/runners/ambient-runner/ag_ui_claude_sdk/adapter.py +++ b/components/runners/ambient-runner/ag_ui_claude_sdk/adapter.py @@ -29,6 +29,7 @@ ToolCallStartEvent, ToolCallArgsEvent, ToolCallEndEvent, + ToolCallResultEvent, StateSnapshotEvent, MessagesSnapshotEvent, ) @@ -70,6 +71,12 @@ emit_system_message_events, ) +# Built-in Claude tools that should halt the stream like frontend tools. +# These are HITL (human-in-the-loop) tools that require user input before +# the agent can continue. The adapter treats them identically to frontend +# tools registered via ``input_data.tools``. +BUILTIN_FRONTEND_TOOLS: set[str] = {"AskUserQuestion"} + logger = logging.getLogger(__name__) @@ -227,6 +234,21 @@ def __init__( # Current state tracking per run (for state management) self._current_state: Optional[Any] = None + # Whether the last run halted due to a frontend tool (caller should interrupt) + self._halted: bool = False + # Tool call ID that caused the halt (so we can emit TOOL_CALL_RESULT on next run) + self._halted_tool_call_id: Optional[str] = None + + @property + def halted(self) -> bool: + """Whether the last run halted due to a frontend tool. + + When ``True`` the caller should interrupt the underlying SDK client + to prevent it from auto-approving the halted tool call with a + placeholder result. + """ + return self._halted + async def run( self, input_data: RunAgentInput, @@ -252,8 +274,13 @@ async def run( thread_id = input_data.thread_id or str(uuid.uuid4()) run_id = input_data.run_id or str(uuid.uuid4()) - # Clear result data from any previous run + # Capture halted tool call ID before clearing (for TOOL_CALL_RESULT emission) + previous_halted_tool_call_id = self._halted_tool_call_id + + # Clear result data and halt flag from any previous run self._last_result_data = None + self._halted = False + self._halted_tool_call_id = None # Initialize state tracking for this run self._current_state = input_data.state @@ -287,6 +314,20 @@ async def run( # Process all messages and extract user message user_message, _ = process_messages(input_data) + # If the previous run halted for a frontend tool (e.g. AskUserQuestion), + # emit a TOOL_CALL_RESULT so the frontend can mark the question as answered. + if previous_halted_tool_call_id and user_message: + yield ToolCallResultEvent( + type=EventType.TOOL_CALL_RESULT, + thread_id=thread_id, + run_id=run_id, + message_id=f"{previous_halted_tool_call_id}-result", + tool_call_id=previous_halted_tool_call_id, + content=user_message, + role="tool", + timestamp=now_ms(), + ) + # Extract frontend tool names for halt detection (like Strands pattern) frontend_tool_names = ( set(extract_tool_names(input_data.tools)) if input_data.tools else set() @@ -836,9 +877,12 @@ def flush_pending_msg(): ) # Check if this is a frontend tool (using unprefixed name for comparison) - # Frontend tools should halt the stream so client can execute handler + # Frontend tools should halt the stream so client can execute handler. + # Also halt for built-in HITL tools (e.g. AskUserQuestion) that + # require user input before the agent can continue. is_frontend_tool = ( current_tool_display_name in frontend_tool_names + or current_tool_display_name in BUILTIN_FRONTEND_TOOLS ) if is_frontend_tool: @@ -869,9 +913,20 @@ def flush_pending_msg(): ) # NOTE: interrupt is the caller's responsibility - # (e.g. worker.interrupt() from the platform layer) + # (e.g. worker.interrupt() from the platform layer). + # Check adapter.halted after the stream ends. + self._halted = True + self._halted_tool_call_id = current_tool_call_id halt_event_stream = True + + # Clear in-flight tool call state to prevent duplicate + # ToolCallEndEvent emission in the finally block + current_tool_call_id = None + current_tool_call_name = None + current_tool_display_name = None + accumulated_tool_json = "" + # Continue consuming remaining events for cleanup continue diff --git a/components/runners/ambient-runner/ambient_runner/bridges/claude/bridge.py b/components/runners/ambient-runner/ambient_runner/bridges/claude/bridge.py index 36da70cbd..41ff07da5 100644 --- a/components/runners/ambient-runner/ambient_runner/bridges/claude/bridge.py +++ b/components/runners/ambient-runner/ambient_runner/bridges/claude/bridge.py @@ -56,6 +56,8 @@ def __init__(self) -> None: self._allowed_tools: list[str] = [] self._system_prompt: dict = {} self._stderr_lines: list[str] = [] + # Per-thread halt tracking to avoid race conditions on shared adapter + self._halted_by_thread: dict[str, bool] = {} # ------------------------------------------------------------------ # PlatformBridge interface @@ -121,6 +123,22 @@ async def run(self, input_data: RunAgentInput) -> AsyncIterator[BaseEvent]: async for event in wrapped_stream: yield event + # Capture halt state for this thread to avoid race conditions + # with concurrent runs modifying the shared adapter's halted flag + self._halted_by_thread[thread_id] = self._adapter.halted + + # If the adapter halted (frontend tool or built-in HITL tool like + # AskUserQuestion), interrupt the worker to prevent the SDK from + # auto-approving the tool call with a placeholder result. + if self._halted_by_thread.get(thread_id, False): + logger.info( + f"Adapter halted for thread={thread_id}, " + "interrupting worker to await user input" + ) + await worker.interrupt() + # Clear the halt flag for this thread + self._halted_by_thread.pop(thread_id, None) + self._first_run = False async def interrupt(self, thread_id: Optional[str] = None) -> None: @@ -166,6 +184,7 @@ def mark_dirty(self) -> None: self._ready = False self._first_run = True self._adapter = None + self._halted_by_thread.clear() if self._session_manager: manager = self._session_manager self._session_manager = None diff --git a/components/runners/ambient-runner/ambient_runner/platform/prompts.py b/components/runners/ambient-runner/ambient_runner/platform/prompts.py index 85b1a6183..d30d01031 100644 --- a/components/runners/ambient-runner/ambient_runner/platform/prompts.py +++ b/components/runners/ambient-runner/ambient_runner/platform/prompts.py @@ -85,6 +85,15 @@ "Provide honest, calibrated scores with clear reasoning.\n\n" ) +HUMAN_INPUT_INSTRUCTIONS = ( + "## Human-in-the-Loop\n" + "When you need user input, a decision, or confirmation before proceeding, " + "you MUST use the AskUserQuestion tool. Do not ask questions in plain text " + "and wait for a response — the AskUserQuestion tool triggers platform " + "notifications and status indicators that help users know you need their " + "attention.\n\n" +) + RESTART_TOOL_DESCRIPTION = ( "Restart the Claude session to recover from issues, clear state, " "or get a fresh connection. Use this if you detect you're in a " @@ -206,6 +215,9 @@ def build_workspace_context_prompt( prompt += f"- **repos/{repo_name}/**\n" prompt += GIT_PUSH_STEPS.format(branch=push_branch) + # Human-in-the-loop instructions + prompt += HUMAN_INPUT_INSTRUCTIONS + # MCP integration setup instructions prompt += MCP_INTEGRATIONS_PROMPT