Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions pkg/agent/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,15 @@ func (cb *ContextBuilder) BuildMessages(
SystemParts: contentBlocks,
})

// Add conversation history
messages = append(messages, history...)
// Add conversation history, annotating messages that have threading IDs
// so the LLM can navigate thread structure from persisted sessions.
for _, msg := range history {
annotated := msg
if prefix := messageThreadAnnotation(msg); prefix != "" {
annotated.Content = prefix + msg.Content
}
messages = append(messages, annotated)
}

// Add current user message
if strings.TrimSpace(currentMessage) != "" {
Expand Down Expand Up @@ -750,3 +757,27 @@ func (cb *ContextBuilder) GetSkillsInfo() map[string]any {
"names": skillNames,
}
}

// messageThreadAnnotation returns the thread annotation prefix for a message,
// e.g. "[msg:#5, reply_to:#3] " or "" if the message has no threading IDs.
func messageThreadAnnotation(msg providers.Message) string {
msgIDs := msg.MessageIDs
formattedIDs := strings.Join(msgIDs, ",#")
if formattedIDs != "" {
formattedIDs = "#" + formattedIDs
}
switch {
case len(msgIDs) > 1 && msg.ReplyToMessageID != "":
return fmt.Sprintf("[msgs:%s, reply_to:#%s] ", formattedIDs, msg.ReplyToMessageID)
case len(msgIDs) > 1:
return fmt.Sprintf("[msgs:%s] ", formattedIDs)
case len(msgIDs) == 1 && msg.ReplyToMessageID != "":
return fmt.Sprintf("[msg:%s, reply_to:#%s] ", formattedIDs, msg.ReplyToMessageID)
case len(msgIDs) == 1:
return fmt.Sprintf("[msg:%s] ", formattedIDs)
case msg.ReplyToMessageID != "":
return fmt.Sprintf("[reply_to:#%s] ", msg.ReplyToMessageID)
default:
return ""
}
}
195 changes: 145 additions & 50 deletions pkg/agent/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,60 @@ type AgentLoop struct {

// processOptions configures how a message is processed
type processOptions struct {
SessionKey string // Session identifier for history/context
Channel string // Target channel for tool execution
ChatID string // Target chat ID for tool execution
SenderID string // Current sender ID for dynamic context
SenderDisplayName string // Current sender display name for dynamic context
UserMessage string // User message content (may include prefix)
Media []string // media:// refs from inbound message
DefaultResponse string // Response when LLM returns empty
EnableSummary bool // Whether to trigger summarization
SendResponse bool // Whether to send response via bus
NoHistory bool // If true, don't load session history (for heartbeat)
SessionKey string // Session identifier for history/context
Channel string // Target channel for tool execution
ChatID string // Target chat ID for tool execution
SenderID string // Current sender ID for dynamic context
SenderDisplayName string // Current sender display name for dynamic context
UserMessage string // User message content (may include prefix)
Media []string // media:// refs from inbound message
DefaultResponse string // Response when LLM returns empty
EnableSummary bool // Whether to trigger summarization
SendResponse bool // Whether to send response via bus
NoHistory bool // If true, don't load session history (for heartbeat)
MessageID string // Inbound platform message ID (for threading)
ReplyToMessageID string // Parent message ID from inbound (for threading)
Sender *providers.MessageSender // Author identity (nil for system/automated messages)
}

type agentResponse struct {
Content string
Channel string
ChatID string
OnDelivered func(msgIDs []string)
}

func (r agentResponse) outboundMessage(defaultChannel, defaultChatID string) bus.OutboundMessage {
channel := r.Channel
if channel == "" {
channel = defaultChannel
}
chatID := r.ChatID
if chatID == "" {
chatID = defaultChatID
}
return bus.OutboundMessage{
Channel: channel,
ChatID: chatID,
Content: r.Content,
OnDelivered: r.OnDelivered,
}
}

func singleMessageIDs(msgID string) []string {
if msgID == "" {
return nil
}
return []string{msgID}
}

func cloneMessageIDs(msgIDs []string) []string {
if len(msgIDs) == 0 {
return nil
}
cloned := make([]string, len(msgIDs))
copy(cloned, msgIDs)
return cloned
}

const (
Expand All @@ -76,6 +119,7 @@ const (
metadataKeyTeamID = "team_id"
metadataKeyParentPeerKind = "parent_peer_kind"
metadataKeyParentPeerID = "parent_peer_id"
metadataKeyReplyToMessage = "reply_to_message_id"
)

func NewAgentLoop(
Expand Down Expand Up @@ -287,10 +331,14 @@ func (al *AgentLoop) Run(ctx context.Context) error {

response, err := al.processMessage(ctx, msg)
if err != nil {
response = fmt.Sprintf("Error processing message: %v", err)
response = agentResponse{
Content: fmt.Sprintf("Error processing message: %v", err),
Channel: msg.Channel,
ChatID: msg.ChatID,
}
}

if response != "" {
if response.Content != "" {
// Check if the message tool already sent a response during this round.
// If so, skip publishing to avoid duplicate messages to the user.
// Use default agent's tools to check (message tool is shared).
Expand All @@ -305,18 +353,18 @@ func (al *AgentLoop) Run(ctx context.Context) error {
}

if !alreadySent {
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
Channel: msg.Channel,
ChatID: msg.ChatID,
Content: response,
})
outbound := response.outboundMessage(msg.Channel, msg.ChatID)
al.bus.PublishOutbound(ctx, outbound)
logger.InfoCF("agent", "Published outbound response",
map[string]any{
"channel": msg.Channel,
"chat_id": msg.ChatID,
"content_len": len(response),
"channel": outbound.Channel,
"chat_id": outbound.ChatID,
"content_len": len(response.Content),
})
} else {
if response.OnDelivered != nil {
response.OnDelivered(nil)
}
logger.DebugCF(
"agent",
"Skipped outbound (message tool already sent)",
Expand Down Expand Up @@ -661,7 +709,14 @@ func (al *AgentLoop) ProcessDirectWithChannel(
SessionKey: sessionKey,
}

return al.processMessage(ctx, msg)
response, err := al.processMessage(ctx, msg)
if err != nil {
return "", err
}
if response.OnDelivered != nil {
response.OnDelivered(nil)
}
return response.Content, nil
}

// ProcessHeartbeat processes a heartbeat request without session history.
Expand All @@ -674,7 +729,7 @@ func (al *AgentLoop) ProcessHeartbeat(
if agent == nil {
return "", fmt.Errorf("no default agent for heartbeat")
}
return al.runAgentLoop(ctx, agent, processOptions{
response, err := al.runAgentLoop(ctx, agent, processOptions{
SessionKey: "heartbeat",
Channel: channel,
ChatID: chatID,
Expand All @@ -684,9 +739,16 @@ func (al *AgentLoop) ProcessHeartbeat(
SendResponse: false,
NoHistory: true, // Don't load session history for heartbeat
})
if err != nil {
return "", err
}
if response.OnDelivered != nil {
response.OnDelivered(nil)
}
return response.Content, nil
}

func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) (string, error) {
func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) (agentResponse, error) {
// Add message preview to log (show full content for error messages)
var logContent string
if strings.Contains(msg.Content, "Error:") || strings.Contains(msg.Content, "error") {
Expand Down Expand Up @@ -721,7 +783,7 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)

route, agent, routeErr := al.resolveMessageRoute(msg)
if routeErr != nil {
return "", routeErr
return agentResponse{}, routeErr
}

// Reset message-tool state for this round so we don't skip publishing due to a previous round.
Expand Down Expand Up @@ -756,12 +818,19 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage)
DefaultResponse: defaultResponse,
EnableSummary: true,
SendResponse: false,
MessageID: msg.MessageID,
ReplyToMessageID: inboundMetadata(msg, metadataKeyReplyToMessage),
Sender: messageSenderFromInbound(msg.Sender),
}

// context-dependent commands check their own Runtime fields and report
// "unavailable" when the required capability is nil.
if response, handled := al.handleCommand(ctx, msg, agent, &opts); handled {
return response, nil
return agentResponse{
Content: response,
Channel: opts.Channel,
ChatID: opts.ChatID,
}, nil
}

return al.runAgentLoop(ctx, agent, opts)
Expand Down Expand Up @@ -799,9 +868,9 @@ func resolveScopeKey(route routing.ResolvedRoute, msgSessionKey string) string {
func (al *AgentLoop) processSystemMessage(
ctx context.Context,
msg bus.InboundMessage,
) (string, error) {
) (agentResponse, error) {
if msg.Channel != "system" {
return "", fmt.Errorf(
return agentResponse{}, fmt.Errorf(
"processSystemMessage called with non-system message channel: %s",
msg.Channel,
)
Expand Down Expand Up @@ -838,13 +907,13 @@ func (al *AgentLoop) processSystemMessage(
"content_len": len(content),
"channel": originChannel,
})
return "", nil
return agentResponse{}, nil
}

// Use default agent for system messages
agent := al.GetRegistry().GetDefaultAgent()
if agent == nil {
return "", fmt.Errorf("no default agent for system message")
return agentResponse{}, fmt.Errorf("no default agent for system message")
}

// Use the origin session for context
Expand All @@ -866,7 +935,7 @@ func (al *AgentLoop) runAgentLoop(
ctx context.Context,
agent *AgentInstance,
opts processOptions,
) (string, error) {
) (agentResponse, error) {
// 0. Record last channel for heartbeat notifications (skip internal channels and cli)
if opts.Channel != "" && opts.ChatID != "" {
if !constants.IsInternalChannel(opts.Channel) {
Expand Down Expand Up @@ -905,12 +974,19 @@ func (al *AgentLoop) runAgentLoop(
messages = resolveMediaRefs(messages, al.mediaStore, maxMediaSize)

// 2. Save user message to session
agent.Sessions.AddMessage(opts.SessionKey, "user", opts.UserMessage)
userMsg := providers.Message{
Role: "user",
Content: opts.UserMessage,
MessageIDs: singleMessageIDs(opts.MessageID),
ReplyToMessageID: opts.ReplyToMessageID,
Sender: opts.Sender,
}
agent.Sessions.AddFullMessage(opts.SessionKey, userMsg)

// 3. Run LLM iteration loop
finalContent, iteration, err := al.runLLMIteration(ctx, agent, messages, opts)
if err != nil {
return "", err
return agentResponse{}, err
}

// If last tool had ForUser content and we already sent it, we might not need to send final response
Expand All @@ -921,25 +997,25 @@ func (al *AgentLoop) runAgentLoop(
finalContent = opts.DefaultResponse
}

// 5. Save final assistant message to session
agent.Sessions.AddMessage(opts.SessionKey, "assistant", finalContent)
agent.Sessions.Save(opts.SessionKey)

// 6. Optional: summarization
if opts.EnableSummary {
al.maybeSummarize(agent, opts.SessionKey, opts.Channel, opts.ChatID)
response := agentResponse{
Content: finalContent,
Channel: opts.Channel,
ChatID: opts.ChatID,
}

// 7. Optional: send response via bus
if opts.SendResponse {
al.bus.PublishOutbound(ctx, bus.OutboundMessage{
Channel: opts.Channel,
ChatID: opts.ChatID,
Content: finalContent,
})
response.OnDelivered = func(msgIDs []string) {
assistantMsg := providers.Message{
Role: "assistant",
Content: finalContent,
MessageIDs: cloneMessageIDs(msgIDs),
}
agent.Sessions.AddFullMessage(opts.SessionKey, assistantMsg)
agent.Sessions.Save(opts.SessionKey)
if opts.EnableSummary {
al.maybeSummarize(agent, opts.SessionKey, opts.Channel, opts.ChatID)
}
}

// 8. Log response
// 5. Log response
responsePreview := utils.Truncate(finalContent, 120)
logger.InfoCF("agent", fmt.Sprintf("Response: %s", responsePreview),
map[string]any{
Expand All @@ -949,7 +1025,7 @@ func (al *AgentLoop) runAgentLoop(
"final_length": len(finalContent),
})

return finalContent, nil
return response, nil
}

func (al *AgentLoop) targetReasoningChannelID(channelName string) (chatID string) {
Expand Down Expand Up @@ -1988,3 +2064,22 @@ func extractProvider(registry *AgentRegistry) (providers.LLMProvider, bool) {
}
return defaultAgent.Provider, true
}

// messageSenderFromInbound converts bus.SenderInfo to providers.MessageSender.
// Returns nil if no meaningful identity is present.
func messageSenderFromInbound(s bus.SenderInfo) *providers.MessageSender {
if s.Username == "" && s.FirstName == "" && s.LastName == "" && s.DisplayName == "" {
return nil
}
username := s.Username
firstName := s.FirstName
lastName := s.LastName
if firstName == "" && lastName == "" && s.DisplayName != "" {
firstName = s.DisplayName
}
return &providers.MessageSender{
Username: username,
FirstName: firstName,
LastName: lastName,
}
}
Loading
Loading