-
Notifications
You must be signed in to change notification settings - Fork 61
feat: human-in-the-loop support with AskUserQuestion #871
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5d67e8f
615dd0a
4d033de
a1e4ef8
fb3a8d8
4585a54
5ed8e63
014bd0f
5cd1cec
d97c923
07d469b
bbe7175
c248f63
d30db53
9f29f5d
7965b66
082343f
044420c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -88,6 +88,7 @@ dmypy.json | |
|
|
||
| # Claude Code | ||
| .claude/settings.local.json | ||
| .claude/worktrees/ | ||
|
|
||
| # mkdocs | ||
| /site | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,6 +41,10 @@ var ( | |
| GetGitHubToken func(context.Context, kubernetes.Interface, dynamic.Interface, string, string) (string, error) | ||
| GetGitLabToken func(context.Context, kubernetes.Interface, string, string) (string, error) | ||
| DeriveRepoFolderFromURL func(string) string | ||
| // DeriveAgentStatusFromEvents derives agentStatus from the persisted event log. | ||
| // Set by the websocket package at init to avoid circular imports. | ||
| // sessionID should be namespace-qualified (e.g., "namespace/sessionName") to avoid cross-project collisions. | ||
| DeriveAgentStatusFromEvents func(sessionID string) string | ||
| // LEGACY: SendMessageToSession removed - AG-UI server uses HTTP/SSE instead of WebSocket | ||
| ) | ||
|
|
||
|
|
@@ -361,6 +365,28 @@ func parseStatus(status map[string]interface{}) *types.AgenticSessionStatus { | |
|
|
||
| // V2 API Handlers - Multi-tenant session management | ||
|
|
||
| // enrichAgentStatus derives agentStatus from the persisted event log for | ||
| // Running sessions. This is the source of truth β it replaces the stale | ||
| // CR-cached value which was subject to goroutine race conditions. | ||
| func enrichAgentStatus(session *types.AgenticSession) { | ||
| if session.Status == nil || session.Status.Phase != "Running" { | ||
| return | ||
| } | ||
| if DeriveAgentStatusFromEvents == nil { | ||
| return | ||
| } | ||
| name, _ := session.Metadata["name"].(string) | ||
| namespace, _ := session.Metadata["namespace"].(string) | ||
| if name == "" || namespace == "" { | ||
| return | ||
| } | ||
| // Use namespace-qualified key to avoid cross-project collisions in the event store | ||
| sessionID := namespace + "/" + name | ||
| if derived := DeriveAgentStatusFromEvents(sessionID); derived != "" { | ||
| session.Status.AgentStatus = types.StringPtr(derived) | ||
| } | ||
| } | ||
|
|
||
| func ListSessions(c *gin.Context) { | ||
| project := c.GetString("project") | ||
|
|
||
|
|
@@ -431,6 +457,11 @@ func ListSessions(c *gin.Context) { | |
| totalCount := len(sessions) | ||
| paginatedSessions, hasMore, nextOffset := paginateSessions(sessions, params.Offset, params.Limit) | ||
|
|
||
| // Derive agentStatus from event log only for paginated sessions (performance optimization) | ||
| for i := range paginatedSessions { | ||
| enrichAgentStatus(&paginatedSessions[i]) | ||
| } | ||
|
|
||
| response := types.PaginatedResponse{ | ||
| Items: paginatedSessions, | ||
| TotalCount: totalCount, | ||
|
|
@@ -645,9 +676,9 @@ func CreateSession(c *gin.Context) { | |
| timeout = *req.Timeout | ||
| } | ||
|
|
||
| // Generate unique name (timestamp-based) | ||
| // Generate unique name (millisecond timestamp for burst-creation safety) | ||
| // Note: Runner will create branch as "ambient/{session-name}" | ||
| timestamp := time.Now().Unix() | ||
| timestamp := time.Now().UnixMilli() | ||
| name := fmt.Sprintf("session-%d", timestamp) | ||
|
Comment on lines
+679
to
682
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Millisecond timestamps still allow name collisions. Two As per coding guidelines, "Handle errors and edge cases explicitly rather than ignoring them." π€ Prompt for AI Agents |
||
|
|
||
| // Create the custom resource | ||
|
|
@@ -903,6 +934,9 @@ func GetSession(c *gin.Context) { | |
| session.Status = parseStatus(status) | ||
| } | ||
|
|
||
| // Derive agentStatus from event log (source of truth) for running sessions | ||
| enrichAgentStatus(&session) | ||
|
|
||
| session.AutoBranch = ComputeAutoBranch(sessionName) | ||
|
|
||
| c.JSON(http.StatusOK, session) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,7 @@ package websocket | |
|
|
||
| import ( | ||
| "ambient-code-backend/types" | ||
| "bytes" | ||
| "encoding/json" | ||
| "fmt" | ||
| "log" | ||
|
|
@@ -194,6 +195,110 @@ func loadEvents(sessionID string) []map[string]interface{} { | |
| return events | ||
| } | ||
|
|
||
| // DeriveAgentStatus reads a session's event log and returns the agent | ||
| // status derived from the last significant events. | ||
| // | ||
| // sessionID should be namespace-qualified (e.g., "namespace/sessionName") to avoid cross-project collisions. | ||
| // Returns "" if the status cannot be determined (no events, file missing, etc.). | ||
| func DeriveAgentStatus(sessionID string) string { | ||
| // sessionID is now namespace-qualified, e.g., "default/session-123" | ||
| path := fmt.Sprintf("%s/sessions/%s/agui-events.jsonl", StateBaseDir, sessionID) | ||
|
|
||
| // Read only the tail of the file to avoid loading entire event log into memory. | ||
| // 64KB is sufficient for recent lifecycle events (scanning backwards). | ||
| const maxTailBytes = 64 * 1024 | ||
|
|
||
| file, err := os.Open(path) | ||
| if err != nil { | ||
| return "" | ||
| } | ||
| defer file.Close() | ||
|
|
||
| stat, err := file.Stat() | ||
| if err != nil { | ||
| return "" | ||
| } | ||
|
|
||
| fileSize := stat.Size() | ||
| var data []byte | ||
|
|
||
| if fileSize <= maxTailBytes { | ||
| // File is small, read it all | ||
| data, err = os.ReadFile(path) | ||
| if err != nil { | ||
| return "" | ||
| } | ||
| } else { | ||
| // File is large, seek to tail and read last N bytes | ||
| offset := fileSize - maxTailBytes | ||
| _, err = file.Seek(offset, 0) | ||
| if err != nil { | ||
| return "" | ||
| } | ||
|
|
||
| data = make([]byte, maxTailBytes) | ||
| n, err := file.Read(data) | ||
| if err != nil { | ||
| return "" | ||
| } | ||
| data = data[:n] | ||
|
|
||
| // Skip partial first line (we seeked into the middle of a line) | ||
| if idx := bytes.IndexByte(data, '\n'); idx >= 0 { | ||
| data = data[idx+1:] | ||
| } | ||
| } | ||
|
Comment on lines
+203
to
+250
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. π§Ή Nitpick | π΅ Trivial Consider caching derived status to reduce I/O on list endpoints. The 64KB tail read optimization is good, but
For endpoints listing many sessions, this creates per-session I/O overhead. Consider caching the derived status (with TTL or invalidation on new events) to avoid repeated file reads for the same session within a short window. |
||
|
|
||
| lines := splitLines(data) | ||
|
Comment on lines
+203
to
+252
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keep status reads on the same migration path as event replay. This function bypasses the loader that handles π€ Prompt for AI Agents |
||
|
|
||
| // Scan backwards. We only care about lifecycle and AskUserQuestion events. | ||
| // RUN_STARTED β "working" | ||
| // RUN_FINISHED / RUN_ERROR β "idle", unless same run had AskUserQuestion | ||
| // TOOL_CALL_START (AskUserQuestion) β "waiting_input" | ||
| var runEndRunID string // set when we hit RUN_FINISHED/RUN_ERROR and need to look deeper | ||
| for i := len(lines) - 1; i >= 0; i-- { | ||
| if len(lines[i]) == 0 { | ||
| continue | ||
| } | ||
| var evt map[string]interface{} | ||
| if err := json.Unmarshal(lines[i], &evt); err != nil { | ||
| continue | ||
| } | ||
| evtType, _ := evt["type"].(string) | ||
|
|
||
| switch evtType { | ||
| case types.EventTypeRunStarted: | ||
| if runEndRunID != "" { | ||
| // We were scanning for an AskUserQuestion but hit RUN_STARTED first β idle | ||
| return types.AgentStatusIdle | ||
| } | ||
| return types.AgentStatusWorking | ||
|
|
||
| case types.EventTypeRunFinished, types.EventTypeRunError: | ||
| if runEndRunID == "" { | ||
| // First run-end seen; scan deeper within this run for AskUserQuestion | ||
| runEndRunID, _ = evt["runId"].(string) | ||
| } | ||
|
|
||
| case types.EventTypeToolCallStart: | ||
| if runEndRunID != "" { | ||
| // Only relevant if we're scanning within the ended run | ||
| if evtRunID, _ := evt["runId"].(string); evtRunID != "" && evtRunID != runEndRunID { | ||
| return types.AgentStatusIdle | ||
| } | ||
| } | ||
| if toolName, _ := evt["toolCallName"].(string); isAskUserQuestionToolCall(toolName) { | ||
| return types.AgentStatusWaitingInput | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if runEndRunID != "" { | ||
| return types.AgentStatusIdle | ||
| } | ||
| return "" | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| // βββ Compaction ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | ||
| // | ||
| // Go port of @ag-ui/client compactEvents. Concatenates streaming deltas | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
π§© Analysis chain
π Script executed:
Repository: ambient-code/platform
Length of output: 50381
π Script executed:
Repository: ambient-code/platform
Length of output: 5144
π Script executed:
Repository: ambient-code/platform
Length of output: 4352
π Script executed:
Repository: ambient-code/platform
Length of output: 6105
π Script executed:
Repository: ambient-code/platform
Length of output: 3936
π Script executed:
Repository: ambient-code/platform
Length of output: 371
π Script executed:
Repository: ambient-code/platform
Length of output: 483
π Script executed:
Repository: ambient-code/platform
Length of output: 3269
π Script executed:
Repository: ambient-code/platform
Length of output: 4377
π Script executed:
Repository: ambient-code/platform
Length of output: 2506
Cross-project session collision in agent status derivation β use namespace-qualified keys in event store.
Sessions can be created with identical names in different projects if created within the same millisecond (e.g.,
session-1709251234567). The event store path uses only the bare session name β{StateBaseDir}/sessions/{sessionName}/agui-events.jsonlβ without namespace qualification. This allows two projects' sessions with the same name to collide and read/write the same event file, causingenrichAgentStatus()to derive incorrectagentStatusfor affected sessions.To fix:
{StateBaseDir}/sessions/{namespace}/{sessionName}/agui-events.jsonlpersistEvent(),loadEvents(),DeriveAgentStatus(), and all callers to use the namespace-qualified pathenrichAgentStatus()passes both namespace and session name toDeriveAgentStatusFromEvents()Alternatively, use the session's UID instead of the name as the store key, since UIDs are globally unique within Kubernetes.
π€ Prompt for AI Agents