diff --git a/internal/coordinator/server.go b/internal/coordinator/server.go index cfb0b4d..2c9ca91 100644 --- a/internal/coordinator/server.go +++ b/internal/coordinator/server.go @@ -44,6 +44,13 @@ type Server struct { sseMu sync.Mutex interrupts *InterruptLedger approvalTracked map[string]time.Time + // nudgePending tracks agents that should be nudged (check-in triggered) + // when they next become idle. Set when a message arrives for an agent. + // The liveness loop picks it up on the next tick where the agent is idle. + // Keyed by "space/agent". + nudgePending map[string]time.Time + nudgeInFlight map[string]bool // prevents duplicate concurrent nudges + nudgeMu sync.Mutex } func NewServer(port, dataDir string) *Server { @@ -58,6 +65,8 @@ func NewServer(port, dataDir string) *Server { sseClients: make(map[*sseClient]struct{}), interrupts: NewInterruptLedger(dataDir), approvalTracked: make(map[string]time.Time), + nudgePending: make(map[string]time.Time), + nudgeInFlight: make(map[string]bool), } } @@ -851,6 +860,12 @@ func (s *Server) handleAgentMessage(w http.ResponseWriter, r *http.Request, spac }) s.broadcastSSE(spaceName, "agent_message", string(sseData)) + // Mark agent for nudge — the liveness loop will trigger a check-in + // when the agent is next idle, so it reads the message via /raw. + s.nudgeMu.Lock() + s.nudgePending[spaceName+"/"+canonical] = time.Now() + s.nudgeMu.Unlock() + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(map[string]interface{}{ @@ -1593,12 +1608,52 @@ func (s *Server) checkAllSessionLiveness() { m["prompt_text"] = e.prompt } payload[i] = m + + // Check if this idle agent has a pending nudge + if e.exists && e.idle { + s.nudgeMu.Lock() + if _, pending := s.nudgePending[key]; pending { + if !s.nudgeInFlight[key] { + s.nudgeInFlight[key] = true + delete(s.nudgePending, key) + s.nudgeMu.Unlock() + go s.executeNudge(space, e.agent) + } else { + s.nudgeMu.Unlock() + } + } else { + s.nudgeMu.Unlock() + } + } } data, _ := json.Marshal(payload) s.broadcastSSE(space, "tmux_liveness", string(data)) } } +// executeNudge triggers a single-agent check-in for an agent that has +// pending messages. Called from the liveness loop when the agent is idle. +func (s *Server) executeNudge(spaceName, agentName string) { + key := spaceName + "/" + agentName + defer func() { + s.nudgeMu.Lock() + delete(s.nudgeInFlight, key) + s.nudgeMu.Unlock() + }() + + s.logEvent(fmt.Sprintf("[%s/%s] auto-nudge: message pending, triggering check-in", spaceName, agentName)) + result := s.SingleAgentCheckIn(spaceName, agentName, "", "") + + if len(result.Errors) > 0 { + s.logEvent(fmt.Sprintf("[%s/%s] auto-nudge failed: %s", spaceName, agentName, result.Errors[0])) + } else if len(result.Sent) > 0 { + s.logEvent(fmt.Sprintf("[%s/%s] auto-nudge complete", spaceName, agentName)) + } + + sseData, _ := json.Marshal(result) + s.broadcastSSE(spaceName, "broadcast_complete", string(sseData)) +} + func (s *Server) recordDecisionInterrupts(spaceName, agentName string, update *AgentUpdate) { for _, q := range update.Questions { ctx := map[string]string{}