Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions internal/coordinator/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ type Server struct {
sseMu sync.Mutex
interrupts *InterruptLedger
approvalTracked map[string]time.Time
// nudgePending tracks agents that should be nudged (check-in triggered)
// when they next become idle. Set when a message arrives for an agent.
// The liveness loop picks it up on the next tick where the agent is idle.
// Keyed by "space/agent".
nudgePending map[string]time.Time
nudgeInFlight map[string]bool // prevents duplicate concurrent nudges
nudgeMu sync.Mutex
}

func NewServer(port, dataDir string) *Server {
Expand All @@ -58,6 +65,8 @@ func NewServer(port, dataDir string) *Server {
sseClients: make(map[*sseClient]struct{}),
interrupts: NewInterruptLedger(dataDir),
approvalTracked: make(map[string]time.Time),
nudgePending: make(map[string]time.Time),
nudgeInFlight: make(map[string]bool),
}
}

Expand Down Expand Up @@ -851,6 +860,12 @@ func (s *Server) handleAgentMessage(w http.ResponseWriter, r *http.Request, spac
})
s.broadcastSSE(spaceName, "agent_message", string(sseData))

// Mark agent for nudge — the liveness loop will trigger a check-in
// when the agent is next idle, so it reads the message via /raw.
s.nudgeMu.Lock()
s.nudgePending[spaceName+"/"+canonical] = time.Now()
s.nudgeMu.Unlock()

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]interface{}{
Expand Down Expand Up @@ -1593,12 +1608,52 @@ func (s *Server) checkAllSessionLiveness() {
m["prompt_text"] = e.prompt
}
payload[i] = m

// Check if this idle agent has a pending nudge
if e.exists && e.idle {
s.nudgeMu.Lock()
if _, pending := s.nudgePending[key]; pending {
if !s.nudgeInFlight[key] {
s.nudgeInFlight[key] = true
delete(s.nudgePending, key)
s.nudgeMu.Unlock()
go s.executeNudge(space, e.agent)
} else {
s.nudgeMu.Unlock()
}
} else {
s.nudgeMu.Unlock()
}
}
}
data, _ := json.Marshal(payload)
s.broadcastSSE(space, "tmux_liveness", string(data))
}
}

// executeNudge triggers a single-agent check-in for an agent that has
// pending messages. Called from the liveness loop when the agent is idle.
func (s *Server) executeNudge(spaceName, agentName string) {
key := spaceName + "/" + agentName
defer func() {
s.nudgeMu.Lock()
delete(s.nudgeInFlight, key)
s.nudgeMu.Unlock()
}()

s.logEvent(fmt.Sprintf("[%s/%s] auto-nudge: message pending, triggering check-in", spaceName, agentName))
result := s.SingleAgentCheckIn(spaceName, agentName, "", "")

if len(result.Errors) > 0 {
s.logEvent(fmt.Sprintf("[%s/%s] auto-nudge failed: %s", spaceName, agentName, result.Errors[0]))
} else if len(result.Sent) > 0 {
s.logEvent(fmt.Sprintf("[%s/%s] auto-nudge complete", spaceName, agentName))
}

sseData, _ := json.Marshal(result)
s.broadcastSSE(spaceName, "broadcast_complete", string(sseData))
}

func (s *Server) recordDecisionInterrupts(spaceName, agentName string, update *AgentUpdate) {
for _, q := range update.Questions {
ctx := map[string]string{}
Expand Down