Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ dmypy.json

# Claude Code
.claude/settings.local.json
.claude/worktrees/

# mkdocs
/site
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -734,9 +734,9 @@ kind-port-forward: check-kubectl check-local-context ## Port-forward kind servic
@echo ""
@echo "$(COLOR_YELLOW)Press Ctrl+C to stop$(COLOR_RESET)"
@echo ""
@trap 'kill 0; echo ""; echo "$(COLOR_GREEN)✓$(COLOR_RESET) Port forwarding stopped"; exit 0' INT; \
kubectl port-forward -n $(NAMESPACE) svc/frontend-service $(KIND_FWD_FRONTEND_PORT):3000 >/dev/null 2>&1 & \
kubectl port-forward -n $(NAMESPACE) svc/backend-service $(KIND_FWD_BACKEND_PORT):8080 >/dev/null 2>&1 & \
@trap 'echo ""; echo "$(COLOR_GREEN)✓$(COLOR_RESET) Port forwarding stopped"; exit 0' INT; \
(kubectl port-forward -n ambient-code svc/frontend-service $(KIND_FWD_FRONTEND_PORT):3000 >/dev/null 2>&1 &); \
(kubectl port-forward -n ambient-code svc/backend-service $(KIND_FWD_BACKEND_PORT):8080 >/dev/null 2>&1 &); \
wait

dev-bootstrap: check-kubectl check-local-context ## Bootstrap developer workspace with API key and integrations
Expand Down
38 changes: 2 additions & 36 deletions components/backend/handlers/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ var (
GetGitHubToken func(context.Context, kubernetes.Interface, dynamic.Interface, string, string) (string, error)
GetGitLabToken func(context.Context, kubernetes.Interface, string, string) (string, error)
DeriveRepoFolderFromURL func(string) string
// DeriveAgentStatusFromEvents derives agentStatus from the persisted event log.
// Set by the websocket package at init to avoid circular imports.
// sessionID should be namespace-qualified (e.g., "namespace/sessionName") to avoid cross-project collisions.
DeriveAgentStatusFromEvents func(sessionID string) string
// LEGACY: SendMessageToSession removed - AG-UI server uses HTTP/SSE instead of WebSocket
)

Expand Down Expand Up @@ -365,28 +361,6 @@ func parseStatus(status map[string]interface{}) *types.AgenticSessionStatus {

// V2 API Handlers - Multi-tenant session management

// enrichAgentStatus derives agentStatus from the persisted event log for
// Running sessions. This is the source of truth — it replaces the stale
// CR-cached value which was subject to goroutine race conditions.
func enrichAgentStatus(session *types.AgenticSession) {
if session.Status == nil || session.Status.Phase != "Running" {
return
}
if DeriveAgentStatusFromEvents == nil {
return
}
name, _ := session.Metadata["name"].(string)
namespace, _ := session.Metadata["namespace"].(string)
if name == "" || namespace == "" {
return
}
// Use namespace-qualified key to avoid cross-project collisions in the event store
sessionID := namespace + "/" + name
if derived := DeriveAgentStatusFromEvents(sessionID); derived != "" {
session.Status.AgentStatus = types.StringPtr(derived)
}
}

func ListSessions(c *gin.Context) {
project := c.GetString("project")

Expand Down Expand Up @@ -457,11 +431,6 @@ func ListSessions(c *gin.Context) {
totalCount := len(sessions)
paginatedSessions, hasMore, nextOffset := paginateSessions(sessions, params.Offset, params.Limit)

// Derive agentStatus from event log only for paginated sessions (performance optimization)
for i := range paginatedSessions {
enrichAgentStatus(&paginatedSessions[i])
}

response := types.PaginatedResponse{
Items: paginatedSessions,
TotalCount: totalCount,
Expand Down Expand Up @@ -676,9 +645,9 @@ func CreateSession(c *gin.Context) {
timeout = *req.Timeout
}

// Generate unique name (millisecond timestamp for burst-creation safety)
// Generate unique name (timestamp-based)
// Note: Runner will create branch as "ambient/{session-name}"
timestamp := time.Now().UnixMilli()
timestamp := time.Now().Unix()
name := fmt.Sprintf("session-%d", timestamp)

// Create the custom resource
Expand Down Expand Up @@ -934,9 +903,6 @@ func GetSession(c *gin.Context) {
session.Status = parseStatus(status)
}

// Derive agentStatus from event log (source of truth) for running sessions
enrichAgentStatus(&session)

session.AutoBranch = ComputeAutoBranch(sessionName)

c.JSON(http.StatusOK, session)
Expand Down
1 change: 0 additions & 1 deletion components/backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ func main() {

// Initialize websocket package
websocket.StateBaseDir = server.StateBaseDir
handlers.DeriveAgentStatusFromEvents = websocket.DeriveAgentStatus

// Normal server mode
if err := server.Run(registerRoutes); err != nil {
Expand Down
7 changes: 0 additions & 7 deletions components/backend/types/agui.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,6 @@ const (
EventTypeMeta = "META"
)

// Agent status values derived from the AG-UI event stream.
const (
AgentStatusWorking = "working"
AgentStatusIdle = "idle"
AgentStatusWaitingInput = "waiting_input"
)

// AG-UI Message Roles
// See: https://docs.ag-ui.com/concepts/messages
const (
Expand Down
1 change: 0 additions & 1 deletion components/backend/types/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type AgenticSessionStatus struct {
StartTime *string `json:"startTime,omitempty"`
CompletionTime *string `json:"completionTime,omitempty"`
LastActivityTime *string `json:"lastActivityTime,omitempty"`
AgentStatus *string `json:"agentStatus,omitempty"`
StoppedReason *string `json:"stoppedReason,omitempty"`
ReconciledRepos []ReconciledRepo `json:"reconciledRepos,omitempty"`
ReconciledWorkflow *ReconciledWorkflow `json:"reconciledWorkflow,omitempty"`
Expand Down
44 changes: 11 additions & 33 deletions components/backend/websocket/agui_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,10 @@ func HandleAGUIRunProxy(c *gin.Context) {

log.Printf("AGUI Proxy: run=%s session=%s/%s msgs=%d", truncID(runID), projectName, sessionName, len(rawMessages))

// Use namespace-qualified session ID to avoid cross-project collisions
namespacedSessionID := projectName + "/" + sessionName

sessionLastSeen.Store(sessionName, time.Now())

// Store project→session mapping for activity tracking in persistStreamedEvent
sessionProjectMap.Store(namespacedSessionID, projectName)
sessionProjectMap.Store(sessionName, projectName)

// Resolve and cache the runner port for this session from the registry.
cacheSessionPort(projectName, sessionName)
Expand Down Expand Up @@ -300,7 +297,7 @@ func HandleAGUIRunProxy(c *gin.Context) {
runnerURL := getRunnerEndpoint(projectName, sessionName)

// Start background goroutine to proxy runner SSE → persist + broadcast
go proxyRunnerStream(runnerURL, bodyBytes, sessionName, namespacedSessionID, runID, threadID)
go proxyRunnerStream(runnerURL, bodyBytes, sessionName, runID, threadID)

// Return metadata immediately — events arrive via GET /agui/events
c.JSON(http.StatusOK, gin.H{
Expand All @@ -312,22 +309,21 @@ func HandleAGUIRunProxy(c *gin.Context) {
// proxyRunnerStream connects to the runner's SSE endpoint, reads events,
// persists them, and publishes them to the live broadcast pipe. Runs in
// a background goroutine so the POST /agui/run handler can return immediately.
// namespacedSessionID is the namespace-qualified session ID (e.g., "namespace/sessionName") for event persistence.
func proxyRunnerStream(runnerURL string, bodyBytes []byte, sessionName, namespacedSessionID, runID, threadID string) {
func proxyRunnerStream(runnerURL string, bodyBytes []byte, sessionName, runID, threadID string) {
log.Printf("AGUI Proxy: connecting to runner at %s", runnerURL)
resp, err := connectToRunner(runnerURL, bodyBytes)
if err != nil {
log.Printf("AGUI Proxy: runner unavailable for %s: %v", sessionName, err)
// Publish error events so GET /agui/events subscribers see the failure
publishAndPersistErrorEvents(sessionName, namespacedSessionID, runID, threadID, "Runner is not available")
publishAndPersistErrorEvents(sessionName, runID, threadID, "Runner is not available")
return
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
log.Printf("AGUI Proxy: runner returned %d: %s", resp.StatusCode, string(body))
publishAndPersistErrorEvents(sessionName, namespacedSessionID, runID, threadID, fmt.Sprintf("Runner error: HTTP %d", resp.StatusCode))
publishAndPersistErrorEvents(sessionName, runID, threadID, fmt.Sprintf("Runner error: HTTP %d", resp.StatusCode))
return
}

Expand All @@ -347,7 +343,7 @@ func proxyRunnerStream(runnerURL string, bodyBytes []byte, sessionName, namespac
// Persist every data event to JSONL
if strings.HasPrefix(trimmed, "data: ") {
jsonData := strings.TrimPrefix(trimmed, "data: ")
persistStreamedEvent(namespacedSessionID, runID, threadID, jsonData)
persistStreamedEvent(sessionName, runID, threadID, jsonData)
}

// Publish raw SSE line to all GET /agui/events subscribers
Expand All @@ -360,15 +356,14 @@ func proxyRunnerStream(runnerURL string, bodyBytes []byte, sessionName, namespac
// publishAndPersistErrorEvents generates RUN_STARTED + RUN_ERROR events,
// persists them, and publishes to the live broadcast so subscribers get
// notified of runner failures.
// sessionName is used for broadcasting; namespacedSessionID is used for persistence.
func publishAndPersistErrorEvents(sessionName, namespacedSessionID, runID, threadID, message string) {
func publishAndPersistErrorEvents(sessionName, runID, threadID, message string) {
// RUN_STARTED
startEvt := map[string]interface{}{
"type": "RUN_STARTED",
"threadId": threadID,
"runId": runID,
}
persistEvent(namespacedSessionID, startEvt)
persistEvent(sessionName, startEvt)
startData, _ := json.Marshal(startEvt)
publishLine(sessionName, fmt.Sprintf("data: %s\n\n", startData))

Expand All @@ -379,7 +374,7 @@ func publishAndPersistErrorEvents(sessionName, namespacedSessionID, runID, threa
"threadId": threadID,
"runId": runID,
}
persistEvent(namespacedSessionID, errEvt)
persistEvent(sessionName, errEvt)
errData, _ := json.Marshal(errEvt)
publishLine(sessionName, fmt.Sprintf("data: %s\n\n", errData))
}
Expand Down Expand Up @@ -441,19 +436,15 @@ func persistStreamedEvent(sessionID, runID, threadID, jsonData string) {

persistEvent(sessionID, event)

// Extract event type; projectName is derived from the
// Update lastActivityTime on CR for activity events (debounced).
// Extract event type to check; projectName is derived from the
// sessionID-to-project mapping populated by HandleAGUIRunProxy.
eventType, _ := event["type"].(string)

// Update lastActivityTime on CR for activity events (debounced).
if isActivityEvent(eventType) {
if projectName, ok := sessionProjectMap.Load(sessionID); ok {
updateLastActivityTime(projectName.(string), sessionID, eventType == types.EventTypeRunStarted)
}
}

// agentStatus is derived at query time from the event log (DeriveAgentStatus).
// No CR updates needed here — the persisted events ARE the source of truth.
}

// ─── POST /agui/interrupt ────────────────────────────────────────────
Expand Down Expand Up @@ -954,16 +945,3 @@ func updateLastActivityTime(projectName, sessionName string, immediate bool) {
}
}()
}

// isAskUserQuestionToolCall checks if a tool call name is the AskUserQuestion HITL tool.
// Uses case-insensitive comparison after stripping non-alpha characters,
// matching the frontend pattern in use-agent-status.ts.
func isAskUserQuestionToolCall(name string) bool {
var clean strings.Builder
for _, r := range strings.ToLower(name) {
if r >= 'a' && r <= 'z' {
clean.WriteRune(r)
}
}
return clean.String() == "askuserquestion"
}
105 changes: 0 additions & 105 deletions components/backend/websocket/agui_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ package websocket

import (
"ambient-code-backend/types"
"bytes"
"encoding/json"
"fmt"
"log"
Expand Down Expand Up @@ -195,110 +194,6 @@ func loadEvents(sessionID string) []map[string]interface{} {
return events
}

// DeriveAgentStatus reads a session's event log and returns the agent
// status derived from the last significant events.
//
// sessionID should be namespace-qualified (e.g., "namespace/sessionName") to avoid cross-project collisions.
// Returns "" if the status cannot be determined (no events, file missing, etc.).
func DeriveAgentStatus(sessionID string) string {
// sessionID is now namespace-qualified, e.g., "default/session-123"
path := fmt.Sprintf("%s/sessions/%s/agui-events.jsonl", StateBaseDir, sessionID)

// Read only the tail of the file to avoid loading entire event log into memory.
// 64KB is sufficient for recent lifecycle events (scanning backwards).
const maxTailBytes = 64 * 1024

file, err := os.Open(path)
if err != nil {
return ""
}
defer file.Close()

stat, err := file.Stat()
if err != nil {
return ""
}

fileSize := stat.Size()
var data []byte

if fileSize <= maxTailBytes {
// File is small, read it all
data, err = os.ReadFile(path)
if err != nil {
return ""
}
} else {
// File is large, seek to tail and read last N bytes
offset := fileSize - maxTailBytes
_, err = file.Seek(offset, 0)
if err != nil {
return ""
}

data = make([]byte, maxTailBytes)
n, err := file.Read(data)
if err != nil {
return ""
}
data = data[:n]

// Skip partial first line (we seeked into the middle of a line)
if idx := bytes.IndexByte(data, '\n'); idx >= 0 {
data = data[idx+1:]
}
}

lines := splitLines(data)

// Scan backwards. We only care about lifecycle and AskUserQuestion events.
// RUN_STARTED → "working"
// RUN_FINISHED / RUN_ERROR → "idle", unless same run had AskUserQuestion
// TOOL_CALL_START (AskUserQuestion) → "waiting_input"
var runEndRunID string // set when we hit RUN_FINISHED/RUN_ERROR and need to look deeper
for i := len(lines) - 1; i >= 0; i-- {
if len(lines[i]) == 0 {
continue
}
var evt map[string]interface{}
if err := json.Unmarshal(lines[i], &evt); err != nil {
continue
}
evtType, _ := evt["type"].(string)

switch evtType {
case types.EventTypeRunStarted:
if runEndRunID != "" {
// We were scanning for an AskUserQuestion but hit RUN_STARTED first → idle
return types.AgentStatusIdle
}
return types.AgentStatusWorking

case types.EventTypeRunFinished, types.EventTypeRunError:
if runEndRunID == "" {
// First run-end seen; scan deeper within this run for AskUserQuestion
runEndRunID, _ = evt["runId"].(string)
}

case types.EventTypeToolCallStart:
if runEndRunID != "" {
// Only relevant if we're scanning within the ended run
if evtRunID, _ := evt["runId"].(string); evtRunID != "" && evtRunID != runEndRunID {
return types.AgentStatusIdle
}
}
if toolName, _ := evt["toolCallName"].(string); isAskUserQuestionToolCall(toolName) {
return types.AgentStatusWaitingInput
}
}
}

if runEndRunID != "" {
return types.AgentStatusIdle
}
return ""
}

// ─── Compaction ──────────────────────────────────────────────────────
//
// Go port of @ag-ui/client compactEvents. Concatenates streaming deltas
Expand Down
Loading
Loading