diff --git a/config/config.example.json b/config/config.example.json index 81c9014ec9..4868653c2e 100644 --- a/config/config.example.json +++ b/config/config.example.json @@ -261,6 +261,32 @@ "enabled": false }, "reasoning_channel_id": "" + }, + "mqtt": { + "enabled": false, + "broker": "tcp://broker.emqx.io:1883", + "client_id": "picoclaw-001", + "username": "", + "password": "", + "subscribe_topics": [ + "picoclaw/input" + ], + "subscribe_json_key": null, + "reply_topic": "picoclaw/output", + "reply_json_key": null, + "tls": false, + "tls_ca": "", + "tls_cert": "", + "tls_key": "", + "qos": 1, + "retain": false, + "prefix": "/mqtt", + "instruction": "Будь предельно краток", + "allow_from": [], + "group_trigger": { + "mention_only": false + }, + "reasoning_channel_id": "" } }, "providers": { diff --git a/docs/channels/mqtt/README.md b/docs/channels/mqtt/README.md new file mode 100644 index 0000000000..a54eeeebaa --- /dev/null +++ b/docs/channels/mqtt/README.md @@ -0,0 +1,142 @@ +# MQTT Channel Configuration Guide + +## 1. Example Configuration + +Add this to `config.json`: + +```json +{ + "channels": { + "mqtt": { + "enabled": true, + "broker": "tcp://localhost:1883", + "client_id": "picoclaw-bot", + "username": "", + "password": "", + "qos": 1, + "retain": false, + "tls": false, + "subscribe_topics": [ + "picoclaw/input" + ], + "subscribe_json_key": null, + "reply_topic": "picoclaw/output", + "reply_json_key": null, + "allow_from": [], + "group_trigger": { + "mention_only": true + }, + "reasoning_channel_id": "", + "instruction": "" + } + } +} +``` + +## 2. Field Reference + +| Field | Type | Required | Description | +|----------------------|----------|----------|-------------| +| enabled | bool | Yes | Enable or disable the MQTT channel | +| broker | string | Yes | MQTT broker URL (e.g., `tcp://localhost:1883`, `ssl://broker.example.com:8883`) | +| client_id | string | Yes | Unique client identifier for the MQTT connection | +| username | string | No | MQTT username for authentication | +| password | string | No | MQTT password for authentication | +| qos | int | No | Quality of Service level (0, 1, or 2). Default: 1 | +| retain | bool | No | Whether to retain messages. Default: false | +| tls | bool | No | Enable TLS/SSL connection. Default: false | +| subscribe_topics | []string | Yes | List of MQTT topics to subscribe to for incoming messages | +| subscribe_json_key | string | No | JSON key to extract from incoming messages. If null, treats message as plain text | +| reply_topic | string | No | Topic to publish replies to. Supports placeholders: `{client_id}`, `{topic}` | +| reply_json_key | string | No | JSON key to use when sending replies. If null, sends as plain text | +| allow_from | []string | No | Client ID whitelist (empty allows all) | +| group_trigger | object | No | Group trigger strategy (`mention_only` / `prefixes`) | +| reasoning_channel_id | string | No | Target channel for reasoning output | +| instruction | string | No | Optional instruction prefix added to all incoming messages | + +## 3. Currently Supported + +- **Message Format**: Supports both JSON and plain text messages + - JSON format: `{"status": "your message"}` + - Plain text: Direct text content + - Automatic JSON parsing with fallback to plain text for malformed JSON +- **JSON Key Extraction**: When `subscribe_json_key` is set, extracts specific field from JSON messages +- **JSON Response Formatting**: When `reply_json_key` is set, sends responses as JSON with specified key +- **Authentication**: Username/password authentication support +- **TLS/SSL**: Secure connections with TLS configuration +- **Quality of Service**: Configurable QoS levels (0, 1, 2) +- **Topic Management**: Multiple subscribe topics and configurable reply topics +- **Message Retention**: Optional message retention on broker +- **Auto-reconnection**: Automatic reconnection with exponential backoff +- **Group Triggers**: Support for mention-only and prefix-based triggers +- **Placeholder Replacement**: Dynamic topic names using `{client_id}` and `{topic}` placeholders + +## 4. Features + +- **Robust Message Handling**: Intelligent parsing that handles malformed JSON gracefully +- **Flexible Topic Configuration**: Support for multiple input topics and dynamic reply topics +- **JSON Message Processing**: Configurable JSON key extraction and response formatting +- **Connection Resilience**: Automatic reconnection with configurable retry intervals +- **Security**: TLS support and authentication for secure communication +- **Message Routing**: Support for reasoning channel routing and group trigger rules + +## 5. Usage Notes + +- The channel automatically handles malformed JSON by attempting to clean and parse it +- Reply topics can use placeholders to dynamically route responses +- Client IDs are used as sender identifiers in the messaging system +- Topics are treated as channels for message routing purposes +- The instruction field allows adding context or commands to all incoming messages + +## 6. JSON Configuration Examples + +### Plain Text Mode (Default) +```json +{ + "subscribe_json_key": null, + "reply_json_key": null +} +``` +- Incoming messages are treated as plain text +- Outgoing messages are sent as plain text + +### JSON Input Mode +```json +{ + "subscribe_json_key": "message", + "reply_json_key": null +} +``` +- Incoming JSON: `{"message": "Hello world", "timestamp": 1234567890}` +- Extracted content: `"Hello world"` +- Outgoing messages are sent as plain text + +### JSON Output Mode +```json +{ + "subscribe_json_key": null, + "reply_json_key": "response" +} +``` +- Incoming messages are treated as plain text +- Outgoing JSON: `{"response": "Bot reply"}` + +### Full JSON Mode +```json +{ + "subscribe_json_key": "input", + "reply_json_key": "output" +} +``` +- Incoming JSON: `{"input": "What's the weather?", "location": "Moscow"}` +- Extracted content: `"What's the weather?"` +- Outgoing JSON: `{"output": "The weather is sunny"}` + +## 7. Troubleshooting + +### Common Issues + +1. **JSON parsing fails**: Ensure your JSON messages are valid +2. **Key not found**: Verify the JSON key exists in your messages +3. **Connection issues**: Check broker URL, credentials, and TLS settings +4. **Permission denied**: Verify client ID is in the `allow_from` list if configured diff --git a/go.mod b/go.mod index cfc930d373..65c561d91f 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/anthropics/anthropic-sdk-go v1.26.0 github.com/bwmarrin/discordgo v0.29.0 github.com/caarlos0/env/v11 v11.4.0 + github.com/eclipse/paho.mqtt.golang v1.5.1 github.com/ergochat/irc-go v0.6.0 github.com/ergochat/readline v0.1.3 github.com/gdamore/tcell/v2 v2.13.8 diff --git a/go.sum b/go.sum index f24b997d45..9ac3f9f51c 100644 --- a/go.sum +++ b/go.sum @@ -46,6 +46,8 @@ github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE= +github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU= github.com/elliotchance/orderedmap/v3 v3.1.0 h1:j4DJ5ObEmMBt/lcwIecKcoRxIQUEnw0L804lXYDt/pg= github.com/elliotchance/orderedmap/v3 v3.1.0/go.mod h1:G+Hc2RwaZvJMcS4JpGCOyViCnGeKf0bTYCGTO4uhjSo= github.com/ergochat/irc-go v0.6.0 h1:Y0AGV76aeihJfCtLaQh+OyJKFiKGrYC0VTkeMZ6XW28= diff --git a/pkg/channels/manager.go b/pkg/channels/manager.go index ff3fa399c4..8e197e1de2 100644 --- a/pkg/channels/manager.go +++ b/pkg/channels/manager.go @@ -158,8 +158,13 @@ func (m *Manager) RecordReactionUndo(channel, chatID string, undo func()) { } // preSend handles typing stop, reaction undo, and placeholder editing before sending a message. -// Returns true if the message was already delivered (skip Send). -func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMessage, ch Channel) bool { +// Returns true if the message was edited into a placeholder (skip Send). +func (m *Manager) preSend( + ctx context.Context, + name string, + msg bus.OutboundMessage, + ch Channel, +) bool { key := name + ":" + msg.ChatID // 1. Stop typing @@ -206,7 +211,11 @@ func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMess return false } -func NewManager(cfg *config.Config, messageBus *bus.MessageBus, store media.MediaStore) (*Manager, error) { +func NewManager( + cfg *config.Config, + messageBus *bus.MessageBus, + store media.MediaStore, +) (*Manager, error) { m := &Manager{ channels: make(map[string]Channel), workers: make(map[string]*channelWorker), @@ -397,6 +406,12 @@ func (m *Manager) initChannels(channels *config.ChannelsConfig) error { m.initChannel("irc", "IRC") } + if m.config.Channels.MQTT.Enabled && m.config.Channels.MQTT.Broker != "" && + m.config.Channels.MQTT.ClientID != "" && + len(m.config.Channels.MQTT.SubscribeTopics) > 0 { + m.initChannel("mqtt", "MQTT") + } + logger.InfoCF("channels", "Channel initialization completed", map[string]any{ "enabled_channels": len(m.channels), }) @@ -615,7 +630,12 @@ func (m *Manager) runWorker(ctx context.Context, name string, w *channelWorker) // - ErrNotRunning / ErrSendFailed: permanent, no retry // - ErrRateLimit: fixed delay retry // - ErrTemporary / unknown: exponential backoff retry -func (m *Manager) sendWithRetry(ctx context.Context, name string, w *channelWorker, msg bus.OutboundMessage) { +func (m *Manager) sendWithRetry( + ctx context.Context, + name string, + w *channelWorker, + msg bus.OutboundMessage, +) { // Rate limit: wait for token if err := w.limiter.Wait(ctx); err != nil { // ctx canceled, shutting down @@ -655,7 +675,10 @@ func (m *Manager) sendWithRetry(ctx context.Context, name string, w *channelWork } // ErrTemporary or unknown error — exponential backoff - backoff := min(time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))), maxBackoff) + backoff := min( + time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))), + maxBackoff, + ) select { case <-time.After(backoff): case <-ctx.Done(): @@ -780,12 +803,21 @@ func (m *Manager) runMediaWorker(ctx context.Context, name string, w *channelWor // sendMediaWithRetry sends a media message through the channel with rate limiting and // retry logic. If the channel does not implement MediaSender, it silently skips. -func (m *Manager) sendMediaWithRetry(ctx context.Context, name string, w *channelWorker, msg bus.OutboundMediaMessage) { +func (m *Manager) sendMediaWithRetry( + ctx context.Context, + name string, + w *channelWorker, + msg bus.OutboundMediaMessage, +) { ms, ok := w.ch.(MediaSender) if !ok { - logger.DebugCF("channels", "Channel does not support MediaSender, skipping media", map[string]any{ - "channel": name, - }) + logger.DebugCF( + "channels", + "Channel does not support MediaSender, skipping media", + map[string]any{ + "channel": name, + }, + ) return } @@ -822,7 +854,10 @@ func (m *Manager) sendMediaWithRetry(ctx context.Context, name string, w *channe } // ErrTemporary or unknown error — exponential backoff - backoff := min(time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))), maxBackoff) + backoff := min( + time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))), + maxBackoff, + ) select { case <-time.After(backoff): case <-ctx.Done(): diff --git a/pkg/channels/mqtt/init.go b/pkg/channels/mqtt/init.go new file mode 100644 index 0000000000..56080f6f55 --- /dev/null +++ b/pkg/channels/mqtt/init.go @@ -0,0 +1,16 @@ +package mqtt + +import ( + "github.com/sipeed/picoclaw/pkg/bus" + "github.com/sipeed/picoclaw/pkg/channels" + "github.com/sipeed/picoclaw/pkg/config" +) + +func init() { + channels.RegisterFactory("mqtt", func(cfg *config.Config, b *bus.MessageBus) (channels.Channel, error) { + if !cfg.Channels.MQTT.Enabled { + return nil, nil + } + return NewMQTTChannel(cfg.Channels.MQTT, b) + }) +} diff --git a/pkg/channels/mqtt/mqtt.go b/pkg/channels/mqtt/mqtt.go new file mode 100644 index 0000000000..0e8a80e458 --- /dev/null +++ b/pkg/channels/mqtt/mqtt.go @@ -0,0 +1,344 @@ +package mqtt + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "fmt" + "os" + "strings" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + + "github.com/sipeed/picoclaw/pkg/bus" + "github.com/sipeed/picoclaw/pkg/channels" + "github.com/sipeed/picoclaw/pkg/config" + "github.com/sipeed/picoclaw/pkg/logger" +) + +// MQTTChannel implements the Channel interface for MQTT brokers. +type MQTTChannel struct { + *channels.BaseChannel + config config.MQTTConfig + client mqtt.Client + ctx context.Context + cancel context.CancelFunc +} + +// MQTTMessage represents the JSON structure for MQTT messages. +type MQTTMessage struct { + Status string `json:"status"` +} + +// NewMQTTChannel creates a new MQTT channel. +func NewMQTTChannel(cfg config.MQTTConfig, messageBus *bus.MessageBus) (*MQTTChannel, error) { + if cfg.Broker == "" { + return nil, fmt.Errorf("mqtt broker is required") + } + if cfg.ClientID == "" { + return nil, fmt.Errorf("mqtt client_id is required") + } + if len(cfg.SubscribeTopics) == 0 { + return nil, fmt.Errorf("mqtt subscribe_topics is required") + } + + base := channels.NewBaseChannel("mqtt", cfg, messageBus, cfg.AllowFrom, + channels.WithMaxMessageLength(4000), + channels.WithGroupTrigger(cfg.GroupTrigger), + channels.WithReasoningChannelID(cfg.ReasoningChannelID), + ) + + return &MQTTChannel{ + BaseChannel: base, + config: cfg, + }, nil +} + +// Name returns the channel name. +func (c *MQTTChannel) Name() string { + return "mqtt" +} + +// Start connects to the MQTT broker and begins listening. +func (c *MQTTChannel) Start(ctx context.Context) error { + logger.InfoC("mqtt", "Starting MQTT channel") + + c.ctx, c.cancel = context.WithCancel(ctx) + + opts := mqtt.NewClientOptions() + opts.AddBroker(c.config.Broker) + opts.SetClientID(c.config.ClientID) + opts.SetUsername(c.config.Username) + opts.SetPassword(c.config.Password) + opts.SetCleanSession(true) + opts.SetAutoReconnect(true) + opts.SetConnectRetry(true) + opts.SetConnectRetryInterval(5 * time.Second) + opts.SetMaxReconnectInterval(1 * time.Minute) + opts.SetKeepAlive(60 * time.Second) + opts.SetPingTimeout(10 * time.Second) + opts.SetWriteTimeout(10 * time.Second) + opts.SetConnectTimeout(30 * time.Second) + + // TLS configuration + if c.config.TLS { + tlsConfig := &tls.Config{ + InsecureSkipVerify: false, + } + + // Load CA certificate if provided + if c.config.TLSCA != "" { + caCert, err := os.ReadFile(c.config.TLSCA) + if err != nil { + return fmt.Errorf("failed to read CA certificate: %w", err) + } + caCertPool := x509.NewCertPool() + if !caCertPool.AppendCertsFromPEM(caCert) { + return fmt.Errorf("failed to parse CA certificate") + } + tlsConfig.RootCAs = caCertPool + } + + // Load client certificate and key if provided + if c.config.TLSCert != "" && c.config.TLSKey != "" { + cert, err := tls.LoadX509KeyPair(c.config.TLSCert, c.config.TLSKey) + if err != nil { + return fmt.Errorf("failed to load client certificate and key: %w", err) + } + tlsConfig.Certificates = []tls.Certificate{cert} + } + + opts.SetTLSConfig(tlsConfig) + } + + // Set message handler + opts.SetDefaultPublishHandler(c.onMessage) + + // Set connection lost handler + opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { + logger.ErrorCF("mqtt", "Connection lost", map[string]any{ + "error": err, + }) + // Connection will be automatically reconnected by the client + }) + + // Set connect handler + opts.SetOnConnectHandler(func(client mqtt.Client) { + logger.InfoC("mqtt", "Connected to MQTT broker") + + // Subscribe to topics after successful connection + var subscriptionErrors []string + for _, topic := range c.config.SubscribeTopics { + if token := c.client.Subscribe(topic, byte(c.config.QoS), nil); token.Wait() && token.Error() != nil { + errMsg := fmt.Sprintf("topic %s: %v", topic, token.Error()) + subscriptionErrors = append(subscriptionErrors, errMsg) + logger.ErrorCF("mqtt", "Failed to subscribe to topic", map[string]any{ + "topic": topic, + "error": token.Error(), + }) + } else { + logger.InfoCF("mqtt", "Subscribed to topic", map[string]any{ + "topic": topic, + }) + } + } + + // Log subscription errors summary + if len(subscriptionErrors) > 0 { + logger.ErrorCF("mqtt", "Subscription errors occurred", map[string]any{ + "errors": subscriptionErrors, + "count": len(subscriptionErrors), + }) + } + }) + + c.client = mqtt.NewClient(opts) + + // Connect with retry + if token := c.client.Connect(); token.Wait() && token.Error() != nil { + // Clean up client on connection failure + c.client = nil + return fmt.Errorf("mqtt connect failed: %w", token.Error()) + } + + logger.InfoCF("mqtt", "Connected to MQTT broker", map[string]any{ + "broker": c.config.Broker, + "client_id": c.config.ClientID, + }) + + c.SetRunning(true) + logger.InfoC("mqtt", "MQTT channel started") + + return nil +} + +// Stop disconnects from the MQTT broker. +func (c *MQTTChannel) Stop(ctx context.Context) error { + logger.InfoC("mqtt", "Stopping MQTT channel") + c.SetRunning(false) + + if c.cancel != nil { + c.cancel() + } + + if c.client != nil && c.client.IsConnected() { + c.client.Disconnect(250) + } + + logger.InfoC("mqtt", "MQTT channel stopped") + return nil +} + +// onMessage handles incoming MQTT messages. +func (c *MQTTChannel) onMessage(client mqtt.Client, msg mqtt.Message) { + logger.DebugCF("mqtt", "Received message", map[string]any{ + "topic": msg.Topic(), + "payload": string(msg.Payload()), + }) + + var content string + + // Check if subscribe_json_key is configured + if c.config.SubscribeJSONKey != nil && *c.config.SubscribeJSONKey != "" { + // Parse as JSON and extract the specified key + var jsonMsg map[string]any + if err := json.Unmarshal(msg.Payload(), &jsonMsg); err == nil { + // Successfully parsed as JSON + if value, exists := jsonMsg[*c.config.SubscribeJSONKey]; exists { + content = fmt.Sprintf("%v", value) + logger.InfoCF("mqtt", "Extracted JSON value", map[string]any{ + "key": *c.config.SubscribeJSONKey, + "content": content, + }) + } else { + logger.WarnCF("mqtt", "JSON key not found in message", map[string]any{ + "key": *c.config.SubscribeJSONKey, + }) + content = string(msg.Payload()) // Fall back to raw payload + } + } else { + // JSON parsing failed, treat as plain text + logger.InfoCF("mqtt", "JSON parsing failed, treating as plain text", map[string]any{ + "error": err.Error(), + "payload": string(msg.Payload()), + }) + content = string(msg.Payload()) + } + } else { + // No JSON key configured, treat as plain text + content = string(msg.Payload()) + logger.InfoCF("mqtt", "Received plain text message", map[string]any{ + "payload": content, + }) + } + + if content == "" { + logger.WarnC("mqtt", "Empty content in MQTT message") + return + } + + // Determine sender ID + senderID := fmt.Sprintf("mqtt:%s", c.config.ClientID) + if msg.Topic() != "" { + senderID = fmt.Sprintf("mqtt:%s", strings.ReplaceAll(msg.Topic(), "/", "_")) + } + + // Check if message has reply-to header (in payload or topic) + replyTopic := c.config.ReplyTopic + if strings.Contains(content, "reply-to:") { + // Improved parsing for reply-to: extract the last occurrence + lastIndex := strings.LastIndex(content, "reply-to:") + if lastIndex != -1 { + replyTopicPart := content[lastIndex+len("reply-to:"):] + // Find the end of the reply-to value (newline or end of string) + if newlineIndex := strings.Index(replyTopicPart, "\n"); newlineIndex != -1 { + replyTopic = strings.TrimSpace(replyTopicPart[:newlineIndex]) + content = strings.TrimSpace(content[:lastIndex]) + strings.TrimSpace(replyTopicPart[newlineIndex:]) + } else { + replyTopic = strings.TrimSpace(replyTopicPart) + content = strings.TrimSpace(content[:lastIndex]) + } + } + } + + // Create peer and sender info + peer := bus.Peer{Kind: "direct", ID: msg.Topic()} // MQTT topics are like channels + sender := bus.SenderInfo{ + Platform: "mqtt", + PlatformID: senderID, + CanonicalID: senderID, + Username: senderID, + DisplayName: senderID, + } + + messageID := fmt.Sprintf("mqtt-%d", time.Now().UnixNano()) + + metadata := map[string]string{ + "platform": "mqtt", + "topic": msg.Topic(), + "reply_topic": replyTopic, + } + + if c.config.Instruction != "" { + content = c.config.Instruction + "\n\n" + content + } + + c.HandleMessage(c.ctx, peer, messageID, senderID, msg.Topic(), content, nil, metadata, sender) +} + +// Send sends a message to an MQTT topic. +func (c *MQTTChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { + if !c.IsRunning() { + return fmt.Errorf("MQTT channel is not running") + } + + replyTopic := c.config.ReplyTopic + + // If chatID contains reply topic info, use it + if strings.HasPrefix(msg.ChatID, "reply:") { + replyTopic = strings.TrimPrefix(msg.ChatID, "reply:") + } + + // Replace placeholders in reply topic + replyTopic = strings.ReplaceAll(replyTopic, "{client_id}", c.config.ClientID) + replyTopic = strings.ReplaceAll(replyTopic, "{topic}", msg.ChatID) + replyTopic = strings.ReplaceAll(replyTopic, "{timestamp}", fmt.Sprintf("%d", time.Now().Unix())) + replyTopic = strings.ReplaceAll(replyTopic, "{message_id}", msg.ReplyToMessageID) + // Add more placeholders as needed + + // Validate reply topic + if replyTopic == "" { + return fmt.Errorf("reply topic is empty and no default configured") + } + + var payload []byte + var err error + + // Check if reply_json_key is configured + if c.config.ReplyJSONKey != nil && *c.config.ReplyJSONKey != "" { + // Send as JSON with the specified key + jsonMsg := map[string]string{ + *c.config.ReplyJSONKey: msg.Content, + } + payload, err = json.Marshal(jsonMsg) + if err != nil { + return fmt.Errorf("failed to marshal MQTT JSON message: %w", err) + } + } else { + // Send as plain text + payload = []byte(msg.Content) + } + + token := c.client.Publish(replyTopic, byte(c.config.QoS), c.config.Retain, payload) + if token.Wait() && token.Error() != nil { + return fmt.Errorf("failed to publish MQTT message: %w", token.Error()) + } + + logger.DebugCF("mqtt", "Published message", map[string]any{ + "topic": replyTopic, + "payload": string(payload), + }) + return nil +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 235cb06410..36d8ad50f8 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -299,6 +299,7 @@ type ChannelsConfig struct { Pico PicoConfig `json:"pico"` PicoClient PicoClientConfig `json:"pico_client"` IRC IRCConfig `json:"irc"` + MQTT MQTTConfig `json:"mqtt"` } // GroupTriggerConfig controls when the bot responds in group chats. @@ -541,6 +542,29 @@ type IRCConfig struct { ReasoningChannelID string `json:"reasoning_channel_id" env:"PICOCLAW_CHANNELS_IRC_REASONING_CHANNEL_ID"` } +type MQTTConfig struct { + Enabled bool `json:"enabled" env:"PICOCLAW_CHANNELS_MQTT_ENABLED"` + Broker string `json:"broker" env:"PICOCLAW_CHANNELS_MQTT_BROKER"` + ClientID string `json:"client_id" env:"PICOCLAW_CHANNELS_MQTT_CLIENT_ID"` + Username string `json:"username" env:"PICOCLAW_CHANNELS_MQTT_USERNAME"` + Password string `json:"password" env:"PICOCLAW_CHANNELS_MQTT_PASSWORD"` + SubscribeTopics []string `json:"subscribe_topics" env:"PICOCLAW_CHANNELS_MQTT_SUBSCRIBE_TOPICS"` + SubscribeJSONKey *string `json:"subscribe_json_key,omitempty"` + ReplyTopic string `json:"reply_topic" env:"PICOCLAW_CHANNELS_MQTT_REPLY_TOPIC"` + ReplyJSONKey *string `json:"reply_json_key,omitempty"` + TLS bool `json:"tls" env:"PICOCLAW_CHANNELS_MQTT_TLS"` + TLSCA string `json:"tls_ca" env:"PICOCLAW_CHANNELS_MQTT_TLS_CA"` + TLSCert string `json:"tls_cert" env:"PICOCLAW_CHANNELS_MQTT_TLS_CERT"` + TLSKey string `json:"tls_key" env:"PICOCLAW_CHANNELS_MQTT_TLS_KEY"` + QoS int `json:"qos" env:"PICOCLAW_CHANNELS_MQTT_QOS"` + Retain bool `json:"retain" env:"PICOCLAW_CHANNELS_MQTT_RETAIN"` + Prefix string `json:"prefix" env:"PICOCLAW_CHANNELS_MQTT_PREFIX"` + Instruction string `json:"instruction" env:"PICOCLAW_CHANNELS_MQTT_INSTRUCTION"` + AllowFrom FlexibleStringSlice `json:"allow_from" env:"PICOCLAW_CHANNELS_MQTT_ALLOW_FROM"` + GroupTrigger GroupTriggerConfig `json:"group_trigger,omitempty"` + ReasoningChannelID string `json:"reasoning_channel_id" env:"PICOCLAW_CHANNELS_MQTT_REASONING_CHANNEL_ID"` +} + type HeartbeatConfig struct { Enabled bool `json:"enabled" env:"PICOCLAW_HEARTBEAT_ENABLED"` Interval int `json:"interval" env:"PICOCLAW_HEARTBEAT_INTERVAL"` // minutes, min 5 @@ -911,10 +935,13 @@ func LoadConfig(path string) (*Config, error) { if passphrase := credential.PassphraseProvider(); passphrase != "" { for _, m := range cfg.ModelList { - if m.APIKey != "" && !strings.HasPrefix(m.APIKey, "enc://") && !strings.HasPrefix(m.APIKey, "file://") { - fmt.Fprintf(os.Stderr, + if m.APIKey != "" && !strings.HasPrefix(m.APIKey, "enc://") && + !strings.HasPrefix(m.APIKey, "file://") { + fmt.Fprintf( + os.Stderr, "picoclaw: warning: model %q has a plaintext api_key; call SaveConfig to encrypt it\n", - m.ModelName) + m.ModelName, + ) } } } @@ -967,7 +994,8 @@ func encryptPlaintextAPIKeys(models []ModelConfig, passphrase string) ([]ModelCo changed := false for i := range sealed { m := &sealed[i] - if m.APIKey == "" || strings.HasPrefix(m.APIKey, "enc://") || strings.HasPrefix(m.APIKey, "file://") { + if m.APIKey == "" || strings.HasPrefix(m.APIKey, "enc://") || + strings.HasPrefix(m.APIKey, "file://") { continue } encrypted, err := credential.Encrypt(passphrase, "", m.APIKey) @@ -1000,7 +1028,13 @@ func resolveAPIKeys(models []ModelConfig, configDir string) error { for j, key := range models[i].APIKeys { resolved, err := cr.Resolve(key) if err != nil { - return fmt.Errorf("model_list[%d] (%s): api_keys[%d]: %w", i, models[i].ModelName, j, err) + return fmt.Errorf( + "model_list[%d] (%s): api_keys[%d]: %w", + i, + models[i].ModelName, + j, + err, + ) } models[i].APIKeys[j] = resolved } diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 9a2706b3b8..1a9ddf8d57 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -21,6 +21,7 @@ import ( _ "github.com/sipeed/picoclaw/pkg/channels/line" _ "github.com/sipeed/picoclaw/pkg/channels/maixcam" _ "github.com/sipeed/picoclaw/pkg/channels/matrix" + _ "github.com/sipeed/picoclaw/pkg/channels/mqtt" _ "github.com/sipeed/picoclaw/pkg/channels/onebot" _ "github.com/sipeed/picoclaw/pkg/channels/pico" _ "github.com/sipeed/picoclaw/pkg/channels/qq" @@ -43,20 +44,29 @@ import ( ) const ( - serviceShutdownTimeout = 30 * time.Second - providerReloadTimeout = 30 * time.Second + serviceShutdownTimeout = 30 * time.Second + + providerReloadTimeout = 30 * time.Second + gracefulShutdownTimeout = 15 * time.Second ) type services struct { - CronService *cron.CronService + CronService *cron.CronService + HeartbeatService *heartbeat.HeartbeatService - MediaStore media.MediaStore - ChannelManager *channels.Manager - DeviceService *devices.Service - HealthServer *health.Server + + MediaStore media.MediaStore + + ChannelManager *channels.Manager + + DeviceService *devices.Service + + HealthServer *health.Server + manualReloadChan chan struct{} - reloading atomic.Bool + + reloading atomic.Bool } type startupBlockedProvider struct { @@ -105,6 +115,7 @@ func Run(debug bool, configPath string, allowEmptyStartup bool) error { startupInfo := agentLoop.GetStartupInfo() toolsInfo := startupInfo["tools"].(map[string]any) skillsInfo := startupInfo["skills"].(map[string]any) + fmt.Printf(" • Tools: %d loaded\n", toolsInfo["count"]) fmt.Printf(" • Skills: %d/%d available\n", skillsInfo["available"], skillsInfo["total"]) @@ -123,6 +134,7 @@ func Run(debug bool, configPath string, allowEmptyStartup bool) error { // Setup manual reload channel for /reload endpoint manualReloadChan := make(chan struct{}, 1) runningServices.manualReloadChan = manualReloadChan + reloadTrigger := func() error { if !runningServices.reloading.CompareAndSwap(false, true) { return fmt.Errorf("reload already in progress") @@ -136,6 +148,7 @@ func Run(debug bool, configPath string, allowEmptyStartup bool) error { return fmt.Errorf("reload already queued") } } + runningServices.HealthServer.SetReloadFunc(reloadTrigger) agentLoop.SetReloadFunc(reloadTrigger) @@ -149,6 +162,7 @@ func Run(debug bool, configPath string, allowEmptyStartup bool) error { var configReloadChan <-chan *config.Config stopWatch := func() {} + if cfg.Gateway.HotReload { configReloadChan, stopWatch = setupConfigWatcherPolling(configPath, debug) logger.Info("Config hot reload enabled") @@ -164,6 +178,7 @@ func Run(debug bool, configPath string, allowEmptyStartup bool) error { logger.Info("Shutting down...") shutdownGateway(runningServices, agentLoop, provider, true) return nil + case newCfg := <-configReloadChan: if !runningServices.reloading.CompareAndSwap(false, true) { logger.Warn("Config reload skipped: another reload is in progress") @@ -173,6 +188,7 @@ func Run(debug bool, configPath string, allowEmptyStartup bool) error { if err != nil { logger.Errorf("Config reload failed: %v", err) } + case <-manualReloadChan: logger.Info("Manual reload triggered via /reload endpoint") newCfg, err := config.LoadConfig(configPath) @@ -222,7 +238,6 @@ func createStartupProvider( }) return &startupBlockedProvider{reason: reason}, "", nil } - return providers.CreateProvider(cfg) } @@ -232,8 +247,8 @@ func setupAndStartServices( msgBus *bus.MessageBus, ) (*services, error) { runningServices := &services{} - execTimeout := time.Duration(cfg.Tools.Cron.ExecTimeoutMinutes) * time.Minute + var err error runningServices.CronService, err = setupCronTool( agentLoop, @@ -279,7 +294,6 @@ func setupAndStartServices( } return nil, fmt.Errorf("error creating channel manager: %w", err) } - agentLoop.SetChannelManager(runningServices.ChannelManager) agentLoop.SetMediaStore(runningServices.MediaStore) @@ -298,11 +312,9 @@ func setupAndStartServices( addr := fmt.Sprintf("%s:%d", cfg.Gateway.Host, cfg.Gateway.Port) runningServices.HealthServer = health.NewServer(cfg.Gateway.Host, cfg.Gateway.Port) runningServices.ChannelManager.SetupHTTPServer(addr, runningServices.HealthServer) - if err = runningServices.ChannelManager.StartAll(context.Background()); err != nil { return nil, fmt.Errorf("error starting channels: %w", err) } - fmt.Printf( "✓ Health endpoints available at http://%s:%d/health, /ready and /reload (POST)\n", cfg.Gateway.Host, @@ -332,15 +344,19 @@ func stopAndCleanupServices(runningServices *services, shutdownTimeout time.Dura if !isReload && runningServices.ChannelManager != nil { runningServices.ChannelManager.StopAll(shutdownCtx) } + if runningServices.DeviceService != nil { runningServices.DeviceService.Stop() } + if runningServices.HeartbeatService != nil { runningServices.HeartbeatService.Stop() } + if runningServices.CronService != nil { runningServices.CronService.Stop() } + if runningServices.MediaStore != nil { if fms, ok := runningServices.MediaStore.(*media.FileMediaStore); ok { fms.Stop() @@ -357,12 +373,9 @@ func shutdownGateway( if cp, ok := provider.(providers.StatefulProvider); ok && fullShutdown { cp.Close() } - stopAndCleanupServices(runningServices, gracefulShutdownTimeout, false) - agentLoop.Stop() agentLoop.Close() - logger.Info("✓ Gateway stopped") } @@ -383,7 +396,6 @@ func handleConfigReload( } logger.Infof(" New model is '%s', recreating provider...", newModel) - logger.Info(" Stopping all services...") stopAndCleanupServices(runningServices, serviceShutdownTimeout, true) @@ -434,8 +446,8 @@ func restartServices( msgBus *bus.MessageBus, ) error { cfg := al.GetConfig() - execTimeout := time.Duration(cfg.Tools.Cron.ExecTimeoutMinutes) * time.Minute + var err error runningServices.CronService, err = setupCronTool( al, @@ -489,6 +501,7 @@ func restartServices( } addr := fmt.Sprintf("%s:%d", cfg.Gateway.Host, cfg.Gateway.Port) + // Reuse existing HealthServer to preserve reloadFunc if runningServices.HealthServer == nil { runningServices.HealthServer = health.NewServer(cfg.Gateway.Host, cfg.Gateway.Port) @@ -514,6 +527,7 @@ func restartServices( transcriber := voice.DetectTranscriber(cfg) al.SetTranscriber(transcriber) + if transcriber != nil { logger.InfoCF("voice", "Transcription re-enabled (agent-level)", map[string]any{"provider": transcriber.Name()}) } else { @@ -534,7 +548,6 @@ func setupConfigWatcherPolling(configPath string, debug bool) (chan *config.Conf lastModTime := getFileModTime(configPath) lastSize := getFileSize(configPath) - ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() @@ -550,7 +563,6 @@ func setupConfigWatcherPolling(configPath string, debug bool) (chan *config.Conf } time.Sleep(500 * time.Millisecond) - lastModTime = currentModTime lastSize = currentSize @@ -614,7 +626,6 @@ func setupCronTool( cfg *config.Config, ) (*cron.CronService, error) { cronStorePath := filepath.Join(workspace, "cron", "jobs.json") - cronService := cron.NewCronService(cronStorePath, nil) var cronTool *tools.CronTool @@ -624,7 +635,6 @@ func setupCronTool( if err != nil { return nil, fmt.Errorf("critical error during CronTool initialization: %w", err) } - agentLoop.RegisterTool(cronTool) } @@ -648,9 +658,11 @@ func createHeartbeatHandler(agentLoop *agent.AgentLoop) func(prompt, channel, ch if err != nil { return tools.ErrorResult(fmt.Sprintf("Heartbeat error: %v", err)) } + if response == "HEARTBEAT_OK" { return tools.SilentResult("Heartbeat OK") } + return tools.SilentResult(response) } }