Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions components/backend/handlers/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ var (
GetGitHubToken func(context.Context, kubernetes.Interface, dynamic.Interface, string, string) (string, error)
GetGitLabToken func(context.Context, kubernetes.Interface, string, string) (string, error)
DeriveRepoFolderFromURL func(string) string
// DeriveAgentStatusFromEvents derives agentStatus from the persisted event log.
// Set by the websocket package at init to avoid circular imports.
DeriveAgentStatusFromEvents func(sessionName string) string
Comment on lines +47 to +49
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use a namespaced key for derived agent status.

DeriveAgentStatusFromEvents only receives sessionName, but session names are namespace-scoped and this file generates them from session-<unix seconds>. Two projects can legitimately share the same name, so list/detail can pick up another project's waiting/working state. Please thread project/namespace or a stable session UID through this lookup.

Suggested fix
-	DeriveAgentStatusFromEvents func(sessionName string) string
+	DeriveAgentStatusFromEvents func(project, sessionName string) string
...
-func enrichAgentStatus(session *types.AgenticSession) {
+func enrichAgentStatus(project string, session *types.AgenticSession) {
 	if session.Status == nil || session.Status.Phase != "Running" {
 		return
 	}
 	if DeriveAgentStatusFromEvents == nil {
 		return
 	}
 	name, _ := session.Metadata["name"].(string)
 	if name == "" {
 		return
 	}
-	if derived := DeriveAgentStatusFromEvents(name); derived != "" {
+	if derived := DeriveAgentStatusFromEvents(project, name); derived != "" {
 		session.Status.AgentStatus = types.StringPtr(derived)
 	}
 }
...
-		enrichAgentStatus(&paginatedSessions[i])
+		enrichAgentStatus(project, &paginatedSessions[i])
...
-	enrichAgentStatus(&session)
+	enrichAgentStatus(project, &session)

Also applies to: 367-383, 456-459, 933-934

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/backend/handlers/sessions.go` around lines 44 - 46,
DeriveAgentStatusFromEvents currently accepts only sessionName which is not
namespace-scoped; change its signature and all call sites to accept a namespaced
identifier (either project/namespace + sessionName or a stable sessionUID) and
use that namespaced key when deriving agent status; update the function
declaration DeriveAgentStatusFromEvents and callers in this file (and the other
affected locations referenced around lines 367-383, 456-459, 933-934) to pass
the extra project/namespace or sessionUID parameter so lookups are isolated per
project/namespace.

// LEGACY: SendMessageToSession removed - AG-UI server uses HTTP/SSE instead of WebSocket
)

Expand Down Expand Up @@ -364,6 +367,25 @@ func parseStatus(status map[string]interface{}) *types.AgenticSessionStatus {

// V2 API Handlers - Multi-tenant session management

// enrichAgentStatus derives agentStatus from the persisted event log for
// Running sessions. This is the source of truth — it replaces the stale
// CR-cached value which was subject to goroutine race conditions.
func enrichAgentStatus(session *types.AgenticSession) {
if session.Status == nil || session.Status.Phase != "Running" {
return
}
if DeriveAgentStatusFromEvents == nil {
return
}
name, _ := session.Metadata["name"].(string)
if name == "" {
return
}
if derived := DeriveAgentStatusFromEvents(name); derived != "" {
session.Status.AgentStatus = types.StringPtr(derived)
}
}

func ListSessions(c *gin.Context) {
project := c.GetString("project")

Expand Down Expand Up @@ -447,6 +469,11 @@ func ListSessions(c *gin.Context) {
totalCount := len(sessions)
paginatedSessions, hasMore, nextOffset := paginateSessions(sessions, params.Offset, params.Limit)

// Derive agentStatus from event log only for paginated sessions (performance optimization)
for i := range paginatedSessions {
enrichAgentStatus(&paginatedSessions[i])
}

response := types.PaginatedResponse{
Items: paginatedSessions,
TotalCount: totalCount,
Expand Down Expand Up @@ -919,6 +946,9 @@ func GetSession(c *gin.Context) {
session.Status = parseStatus(status)
}

// Derive agentStatus from event log (source of truth) for running sessions
enrichAgentStatus(&session)

session.AutoBranch = ComputeAutoBranch(sessionName)

c.JSON(http.StatusOK, session)
Expand Down
1 change: 1 addition & 0 deletions components/backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func main() {

// Initialize websocket package
websocket.StateBaseDir = server.StateBaseDir
handlers.DeriveAgentStatusFromEvents = websocket.DeriveAgentStatus

// Normal server mode
if err := server.Run(registerRoutes); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions components/backend/types/agui.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ const (
EventTypeMeta = "META"
)

// Agent status values derived from the AG-UI event stream.
const (
AgentStatusWorking = "working"
AgentStatusIdle = "idle"
AgentStatusWaitingInput = "waiting_input"
)

// AG-UI Message Roles
// See: https://docs.ag-ui.com/concepts/messages
const (
Expand Down
1 change: 1 addition & 0 deletions components/backend/types/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type AgenticSessionStatus struct {
StartTime *string `json:"startTime,omitempty"`
CompletionTime *string `json:"completionTime,omitempty"`
LastActivityTime *string `json:"lastActivityTime,omitempty"`
AgentStatus *string `json:"agentStatus,omitempty"`
StoppedReason *string `json:"stoppedReason,omitempty"`
ReconciledRepos []ReconciledRepo `json:"reconciledRepos,omitempty"`
ReconciledWorkflow *ReconciledWorkflow `json:"reconciledWorkflow,omitempty"`
Expand Down
21 changes: 19 additions & 2 deletions components/backend/websocket/agui_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,15 +439,19 @@ func persistStreamedEvent(sessionID, runID, threadID, jsonData string) {

persistEvent(sessionID, event)

// Update lastActivityTime on CR for activity events (debounced).
// Extract event type to check; projectName is derived from the
// Extract event type; projectName is derived from the
// sessionID-to-project mapping populated by HandleAGUIRunProxy.
eventType, _ := event["type"].(string)

// Update lastActivityTime on CR for activity events (debounced).
if isActivityEvent(eventType) {
if projectName, ok := sessionProjectMap.Load(sessionID); ok {
updateLastActivityTime(projectName.(string), sessionID, eventType == types.EventTypeRunStarted)
Comment on lines +442 to 449
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use a project-scoped key here, not sessionID alone.

Line 445 recovers projectName from sessionProjectMap using only sessionID, but that map is populated by sessionName. If two projects have the same session name, the later run overwrites the earlier entry and activity from one session can update lastActivityTime on the wrong CR. That will make the wrong session appear active while the real one goes stale.

Suggested direction
- go proxyRunnerStream(runnerURL, bodyBytes, sessionName, runID, threadID)
+ go proxyRunnerStream(runnerURL, bodyBytes, projectName, sessionName, runID, threadID)

- func proxyRunnerStream(runnerURL string, bodyBytes []byte, sessionName, runID, threadID string) {
+ func proxyRunnerStream(runnerURL string, bodyBytes []byte, projectName, sessionName, runID, threadID string) {

-   persistStreamedEvent(sessionName, runID, threadID, jsonData)
+   persistStreamedEvent(projectName, sessionName, runID, threadID, jsonData)

- func persistStreamedEvent(sessionID, runID, threadID, jsonData string) {
+ func persistStreamedEvent(projectName, sessionID, runID, threadID, jsonData string) {

-   if projectName, ok := sessionProjectMap.Load(sessionID); ok {
-     updateLastActivityTime(projectName.(string), sessionID, eventType == types.EventTypeRunStarted)
-   }
+   updateLastActivityTime(projectName, sessionID, eventType == types.EventTypeRunStarted)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/backend/websocket/agui_proxy.go` around lines 439 - 446, The bug
is that sessionProjectMap is being looked up by sessionID alone
(sessionProjectMap.Load(sessionID) in the activity branch) but the map is
populated using a project-scoped key (sessionName/projectName), so entries from
different projects can collide; fix by using the same project-scoped key format
everywhere: derive the composite key (the same key format used in
HandleAGUIRunProxy/session registration) when loading sessionProjectMap and when
updating lastActivityTime (keep updateLastActivityTime(projectName, sessionID,
... ) as-is but ensure you retrieve the correct projectName by loading
sessionProjectMap with the composite key rather than plain sessionID); update
any registration code to use and document that composite key consistently.

}
}

// agentStatus is derived at query time from the event log (DeriveAgentStatus).
// No CR updates needed here — the persisted events ARE the source of truth.
}

// ─── POST /agui/interrupt ────────────────────────────────────────────
Expand Down Expand Up @@ -948,3 +952,16 @@ func updateLastActivityTime(projectName, sessionName string, immediate bool) {
}
}()
}

// isAskUserQuestionToolCall checks if a tool call name is the AskUserQuestion HITL tool.
// Uses case-insensitive comparison after stripping non-alpha characters,
// matching the frontend pattern in use-agent-status.ts.
func isAskUserQuestionToolCall(name string) bool {
var clean strings.Builder
for _, r := range strings.ToLower(name) {
if r >= 'a' && r <= 'z' {
clean.WriteRune(r)
}
}
return clean.String() == "askuserquestion"
}
103 changes: 103 additions & 0 deletions components/backend/websocket/agui_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package websocket
import (
"ambient-code-backend/types"
"bufio"
"bytes"
"encoding/json"
"fmt"
"log"
Expand Down Expand Up @@ -210,6 +211,108 @@ func loadEvents(sessionID string) []map[string]interface{} {
return events
}

// DeriveAgentStatus reads a session's event log and returns the agent
// status derived from the last significant events.
//
// Returns "" if the status cannot be determined (no events, file missing, etc.).
func DeriveAgentStatus(sessionID string) string {
path := fmt.Sprintf("%s/sessions/%s/agui-events.jsonl", StateBaseDir, sessionID)

// Read only the tail of the file to avoid loading entire event log into memory.
// 64KB is sufficient for recent lifecycle events (scanning backwards).
const maxTailBytes = 64 * 1024

file, err := os.Open(path)
if err != nil {
return ""
}
defer file.Close()

stat, err := file.Stat()
if err != nil {
return ""
}

fileSize := stat.Size()
var data []byte

if fileSize <= maxTailBytes {
// File is small, read it all
data, err = os.ReadFile(path)
if err != nil {
return ""
}
Comment on lines +239 to +244
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Minor inefficiency: file opened twice for small files.

The file is already open from line 209, but for small files, os.ReadFile is called which opens the file again. Consider reading from the already-open file handle instead.

♻️ Proposed fix to read from existing handle
 	if fileSize <= maxTailBytes {
-		// File is small, read it all
-		data, err = os.ReadFile(path)
-		if err != nil {
-			return ""
-		}
+		// File is small, read it all from the open handle
+		data = make([]byte, fileSize)
+		_, err = file.Read(data)
+		if err != nil {
+			return ""
+		}
 	} else {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/backend/websocket/agui_store.go` around lines 223 - 228, The code
opens a file earlier but still calls os.ReadFile(path) when fileSize <=
maxTailBytes, causing a second open; replace that call by reading from the
already-open file handle (the variable used to open the file earlier, e.g. f) —
seek to the beginning if needed and use io.ReadAll(f) (or equivalent) to
populate data, handle errors the same way, and avoid reopening the file; update
the branch that currently calls os.ReadFile(path) to use the existing handle and
remove the redundant open.

} else {
// File is large, seek to tail and read last N bytes
offset := fileSize - maxTailBytes
_, err = file.Seek(offset, 0)
if err != nil {
return ""
}

data = make([]byte, maxTailBytes)
n, err := file.Read(data)
if err != nil {
return ""
}
data = data[:n]

// Skip partial first line (we seeked into the middle of a line)
if idx := bytes.IndexByte(data, '\n'); idx >= 0 {
data = data[idx+1:]
}
}

lines := splitLines(data)

// Scan backwards. We only care about lifecycle and AskUserQuestion events.
// RUN_STARTED → "working"
// RUN_FINISHED / RUN_ERROR → "idle", unless same run had AskUserQuestion
// TOOL_CALL_START (AskUserQuestion) → "waiting_input"
var runEndRunID string // set when we hit RUN_FINISHED/RUN_ERROR and need to look deeper
for i := len(lines) - 1; i >= 0; i-- {
if len(lines[i]) == 0 {
continue
}
var evt map[string]interface{}
if err := json.Unmarshal(lines[i], &evt); err != nil {
continue
}
evtType, _ := evt["type"].(string)

switch evtType {
case types.EventTypeRunStarted:
if runEndRunID != "" {
// We were scanning for an AskUserQuestion but hit RUN_STARTED first → idle
return types.AgentStatusIdle
}
return types.AgentStatusWorking

case types.EventTypeRunFinished, types.EventTypeRunError:
if runEndRunID == "" {
// First run-end seen; scan deeper within this run for AskUserQuestion
runEndRunID, _ = evt["runId"].(string)
}

case types.EventTypeToolCallStart:
if runEndRunID != "" {
// Only relevant if we're scanning within the ended run
if evtRunID, _ := evt["runId"].(string); evtRunID != "" && evtRunID != runEndRunID {
return types.AgentStatusIdle
}
}
if toolName, _ := evt["toolCallName"].(string); isAskUserQuestionToolCall(toolName) {
return types.AgentStatusWaitingInput
}
}
}

if runEndRunID != "" {
return types.AgentStatusIdle
}
return ""
}

// ─── Compaction ──────────────────────────────────────────────────────
//
// Go port of @ag-ui/client compactEvents. Concatenates streaming deltas
Expand Down
Loading
Loading