From e6a78267df6571f2804e2bb536bcbaba67ae6100 Mon Sep 17 00:00:00 2001 From: Live Control Date: Sun, 15 Mar 2026 15:34:52 +0300 Subject: [PATCH 01/10] channel mqtt --- config/config.example.json | 24 +++ docs/channels/mqtt/README.md | 80 +++++++++ go.mod | 8 + go.sum | 2 + pkg/channels/manager.go | 4 + pkg/channels/mqtt/init.go | 16 ++ pkg/channels/mqtt/mqtt.go | 314 +++++++++++++++++++++++++++++++++++ pkg/config/config.go | 22 +++ pkg/gateway/gateway.go | 1 + 9 files changed, 471 insertions(+) create mode 100644 docs/channels/mqtt/README.md create mode 100644 pkg/channels/mqtt/init.go create mode 100644 pkg/channels/mqtt/mqtt.go diff --git a/config/config.example.json b/config/config.example.json index 81c9014ec..43bbb9ad8 100644 --- a/config/config.example.json +++ b/config/config.example.json @@ -261,6 +261,30 @@ "enabled": false }, "reasoning_channel_id": "" + }, + "mqtt": { + "enabled": false, + "broker": "tcp://broker.emqx.io:1883", + "client_id": "picoclaw-001", + "username": "", + "password": "", + "subscribe_topics": [ + "picoclaw/input" + ], + "reply_topic": "picoclaw/output", + "tls": false, + "tls_ca": "", + "tls_cert": "", + "tls_key": "", + "qos": 1, + "retain": false, + "prefix": "/mqtt", + "instruction": "Будь предельно краток", + "allow_from": [], + "group_trigger": { + "mention_only": false + }, + "reasoning_channel_id": "" } }, "providers": { diff --git a/docs/channels/mqtt/README.md b/docs/channels/mqtt/README.md new file mode 100644 index 000000000..370c44ffa --- /dev/null +++ b/docs/channels/mqtt/README.md @@ -0,0 +1,80 @@ +# MQTT Channel Configuration Guide + +## 1. Example Configuration + +Add this to `config.json`: + +```json +{ + "channels": { + "mqtt": { + "enabled": true, + "broker": "tcp://localhost:1883", + "client_id": "picoclaw-bot", + "username": "", + "password": "", + "qos": 1, + "retain": false, + "tls": false, + "subscribe_topics": ["picoclaw/chat"], + "reply_topic": "picoclaw/reply", + "allow_from": [], + "group_trigger": { + "mention_only": true + }, + "reasoning_channel_id": "", + "instruction": "" + } + } +} +``` + +## 2. Field Reference + +| Field | Type | Required | Description | +|----------------------|----------|----------|-------------| +| enabled | bool | Yes | Enable or disable the MQTT channel | +| broker | string | Yes | MQTT broker URL (e.g., `tcp://localhost:1883`, `ssl://broker.example.com:8883`) | +| client_id | string | Yes | Unique client identifier for the MQTT connection | +| username | string | No | MQTT username for authentication | +| password | string | No | MQTT password for authentication | +| qos | int | No | Quality of Service level (0, 1, or 2). Default: 1 | +| retain | bool | No | Whether to retain messages. Default: false | +| tls | bool | No | Enable TLS/SSL connection. Default: false | +| subscribe_topics | []string | Yes | List of MQTT topics to subscribe to for incoming messages | +| reply_topic | string | No | Topic to publish replies to. Supports placeholders: `{client_id}`, `{topic}` | +| allow_from | []string | No | Client ID whitelist (empty allows all) | +| group_trigger | object | No | Group trigger strategy (`mention_only` / `prefixes`) | +| reasoning_channel_id | string | No | Target channel for reasoning output | +| instruction | string | No | Optional instruction prefix added to all incoming messages | + +## 3. Currently Supported + +- **Message Format**: Supports both JSON and plain text messages + - JSON format: `{"status": "your message"}` + - Plain text: Direct text content + - Automatic JSON parsing with fallback to plain text for malformed JSON +- **Authentication**: Username/password authentication support +- **TLS/SSL**: Secure connections with TLS configuration +- **Quality of Service**: Configurable QoS levels (0, 1, 2) +- **Topic Management**: Multiple subscribe topics and configurable reply topics +- **Message Retention**: Optional message retention on broker +- **Auto-reconnection**: Automatic reconnection with exponential backoff +- **Group Triggers**: Support for mention-only and prefix-based triggers +- **Placeholder Replacement**: Dynamic topic names using `{client_id}` and `{topic}` placeholders + +## 4. Features + +- **Robust Message Handling**: Intelligent parsing that handles malformed JSON gracefully +- **Flexible Topic Configuration**: Support for multiple input topics and dynamic reply topics +- **Connection Resilience**: Automatic reconnection with configurable retry intervals +- **Security**: TLS support and authentication for secure communication +- **Message Routing**: Support for reasoning channel routing and group trigger rules + +## 5. Usage Notes + +- The channel automatically handles malformed JSON by attempting to clean and parse it +- Reply topics can use placeholders to dynamically route responses +- Client IDs are used as sender identifiers in the messaging system +- Topics are treated as channels for message routing purposes +- The instruction field allows adding context or commands to all incoming messages \ No newline at end of file diff --git a/go.mod b/go.mod index cfc930d37..1376ae687 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/bwmarrin/discordgo v0.29.0 github.com/caarlos0/env/v11 v11.4.0 github.com/ergochat/irc-go v0.6.0 + github.com/eclipse/paho.mqtt.golang v1.4.3 github.com/ergochat/readline v0.1.3 github.com/gdamore/tcell/v2 v2.13.8 github.com/gomarkdown/markdown v0.0.0-20260217112301-37c66b85d6ab @@ -93,8 +94,15 @@ require ( github.com/valyala/fastjson v1.6.10 // indirect github.com/yosida95/uritemplate/v3 v3.0.2 // indirect golang.org/x/arch v0.24.0 // indirect +<<<<<<< HEAD golang.org/x/crypto v0.49.0 golang.org/x/net v0.52.0 golang.org/x/sync v0.20.0 // indirect golang.org/x/sys v0.42.0 // indirect +======= + golang.org/x/crypto v0.48.0 + golang.org/x/net v0.51.0 + golang.org/x/sync v0.19.0 // indirect + golang.org/x/sys v0.41.0 // indirect +>>>>>>> 1038a05 (channel mqtt) ) diff --git a/go.sum b/go.sum index f24b997d4..cb338e77e 100644 --- a/go.sum +++ b/go.sum @@ -46,6 +46,8 @@ github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= +github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= github.com/elliotchance/orderedmap/v3 v3.1.0 h1:j4DJ5ObEmMBt/lcwIecKcoRxIQUEnw0L804lXYDt/pg= github.com/elliotchance/orderedmap/v3 v3.1.0/go.mod h1:G+Hc2RwaZvJMcS4JpGCOyViCnGeKf0bTYCGTO4uhjSo= github.com/ergochat/irc-go v0.6.0 h1:Y0AGV76aeihJfCtLaQh+OyJKFiKGrYC0VTkeMZ6XW28= diff --git a/pkg/channels/manager.go b/pkg/channels/manager.go index ff3fa399c..7a1e090e8 100644 --- a/pkg/channels/manager.go +++ b/pkg/channels/manager.go @@ -397,6 +397,10 @@ func (m *Manager) initChannels(channels *config.ChannelsConfig) error { m.initChannel("irc", "IRC") } + if m.config.Channels.MQTT.Enabled && m.config.Channels.MQTT.Broker != "" && m.config.Channels.MQTT.ClientID != "" && len(m.config.Channels.MQTT.SubscribeTopics) > 0 { + m.initChannel("mqtt", "MQTT") + } + logger.InfoCF("channels", "Channel initialization completed", map[string]any{ "enabled_channels": len(m.channels), }) diff --git a/pkg/channels/mqtt/init.go b/pkg/channels/mqtt/init.go new file mode 100644 index 000000000..c5f77713f --- /dev/null +++ b/pkg/channels/mqtt/init.go @@ -0,0 +1,16 @@ +package mqtt + +import ( + "github.com/sipeed/picoclaw/pkg/bus" + "github.com/sipeed/picoclaw/pkg/channels" + "github.com/sipeed/picoclaw/pkg/config" +) + +func init() { + channels.RegisterFactory("mqtt", func(cfg *config.Config, b *bus.MessageBus) (channels.Channel, error) { + if !cfg.Channels.MQTT.Enabled { + return nil, nil + } + return NewMQTTChannel(cfg.Channels.MQTT, b) + }) +} \ No newline at end of file diff --git a/pkg/channels/mqtt/mqtt.go b/pkg/channels/mqtt/mqtt.go new file mode 100644 index 000000000..d6efe040d --- /dev/null +++ b/pkg/channels/mqtt/mqtt.go @@ -0,0 +1,314 @@ +package mqtt + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "strings" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + + "github.com/sipeed/picoclaw/pkg/bus" + "github.com/sipeed/picoclaw/pkg/channels" + "github.com/sipeed/picoclaw/pkg/config" + "github.com/sipeed/picoclaw/pkg/logger" +) + +// MQTTChannel implements the Channel interface for MQTT brokers. +type MQTTChannel struct { + *channels.BaseChannel + config config.MQTTConfig + client mqtt.Client + ctx context.Context + cancel context.CancelFunc +} + +// MQTTMessage represents the JSON structure for MQTT messages. +type MQTTMessage struct { + Status string `json:"status"` +} + +// NewMQTTChannel creates a new MQTT channel. +func NewMQTTChannel(cfg config.MQTTConfig, messageBus *bus.MessageBus) (*MQTTChannel, error) { + if cfg.Broker == "" { + return nil, fmt.Errorf("mqtt broker is required") + } + if cfg.ClientID == "" { + return nil, fmt.Errorf("mqtt client_id is required") + } + if len(cfg.SubscribeTopics) == 0 { + return nil, fmt.Errorf("mqtt subscribe_topics is required") + } + + base := channels.NewBaseChannel("mqtt", cfg, messageBus, cfg.AllowFrom, + channels.WithMaxMessageLength(4000), + channels.WithGroupTrigger(cfg.GroupTrigger), + channels.WithReasoningChannelID(cfg.ReasoningChannelID), + ) + + return &MQTTChannel{ + BaseChannel: base, + config: cfg, + }, nil +} + +// Name returns the channel name. +func (c *MQTTChannel) Name() string { + return "mqtt" +} + +// Start connects to the MQTT broker and begins listening. +func (c *MQTTChannel) Start(ctx context.Context) error { + logger.InfoC("mqtt", "Starting MQTT channel") + + c.ctx, c.cancel = context.WithCancel(ctx) + + opts := mqtt.NewClientOptions() + opts.AddBroker(c.config.Broker) + opts.SetClientID(c.config.ClientID) + opts.SetUsername(c.config.Username) + opts.SetPassword(c.config.Password) + opts.SetCleanSession(true) + opts.SetAutoReconnect(true) + opts.SetConnectRetry(true) + opts.SetConnectRetryInterval(5 * time.Second) + opts.SetMaxReconnectInterval(5 * time.Minute) + opts.SetKeepAlive(60 * time.Second) + opts.SetPingTimeout(10 * time.Second) + opts.SetWriteTimeout(10 * time.Second) + opts.SetConnectTimeout(30 * time.Second) + + // TLS configuration + if c.config.TLS { + tlsConfig := &tls.Config{ + InsecureSkipVerify: false, + } + if c.config.TLSCA != "" { + // Load CA cert if provided + // For simplicity, assuming file path + // In production, load cert properly + } + if c.config.TLSCert != "" && c.config.TLSKey != "" { + // Load client cert + } + opts.SetTLSConfig(tlsConfig) + } + + // Set message handler + opts.SetDefaultPublishHandler(c.onMessage) + + c.client = mqtt.NewClient(opts) + + // Connect with retry + if token := c.client.Connect(); token.Wait() && token.Error() != nil { + return fmt.Errorf("mqtt connect failed: %w", token.Error()) + } + + logger.InfoCF("mqtt", "Connected to MQTT broker", map[string]any{ + "broker": c.config.Broker, + "client_id": c.config.ClientID, + }) + + // Subscribe to topics + for _, topic := range c.config.SubscribeTopics { + if token := c.client.Subscribe(topic, byte(c.config.QoS), nil); token.Wait() && token.Error() != nil { + logger.ErrorCF("mqtt", "Failed to subscribe to topic", map[string]any{ + "topic": topic, + "error": token.Error(), + }) + } else { + logger.InfoCF("mqtt", "Subscribed to topic", map[string]any{ + "topic": topic, + }) + } + } + + c.SetRunning(true) + logger.InfoC("mqtt", "MQTT channel started") + return nil +} + +// Stop disconnects from the MQTT broker. +func (c *MQTTChannel) Stop(ctx context.Context) error { + logger.InfoC("mqtt", "Stopping MQTT channel") + c.SetRunning(false) + + if c.cancel != nil { + c.cancel() + } + + if c.client != nil && c.client.IsConnected() { + c.client.Disconnect(250) + } + + logger.InfoC("mqtt", "MQTT channel stopped") + return nil +} + +// onMessage handles incoming MQTT messages. +func (c *MQTTChannel) onMessage(client mqtt.Client, msg mqtt.Message) { + logger.DebugCF("mqtt", "Received message", map[string]any{ + "topic": msg.Topic(), + "payload": string(msg.Payload()), + }) + + // Try to parse as JSON first + var mqttMsg MQTTMessage + var content string + var err error + + // First try to parse as JSON + if err = json.Unmarshal(msg.Payload(), &mqttMsg); err == nil { + // Successfully parsed as JSON + content = mqttMsg.Status + } else { + // If JSON parsing fails, try to clean up common malformed JSON issues + payloadStr := string(msg.Payload()) + + // Try to extract JSON from malformed strings (e.g., extra quotes or braces) + // Look for a valid JSON object within the string + if strings.HasPrefix(payloadStr, "{") && (strings.HasSuffix(payloadStr, "}") || strings.HasSuffix(payloadStr, "}\"")) { + // Try to parse as-is first + if err2 := json.Unmarshal(msg.Payload(), &mqttMsg); err2 == nil { + content = mqttMsg.Status + } else { + // Try to clean up common issues + cleaned := strings.TrimSpace(payloadStr) + + // Remove extra quotes and braces from the end + for strings.HasSuffix(cleaned, "}") && strings.Count(cleaned, "{") < strings.Count(cleaned, "}") { + cleaned = cleaned[:len(cleaned)-1] + } + for strings.HasSuffix(cleaned, "\"}") && strings.Count(cleaned, "{") < strings.Count(cleaned, "}") { + cleaned = cleaned[:len(cleaned)-1] + } + // Remove any trailing quotes (simple check for extra quotes at the end) + for strings.HasSuffix(cleaned, "\"") && !strings.HasSuffix(cleaned, "\"}") { + cleaned = cleaned[:len(cleaned)-1] + } + + // Remove extra opening braces + for strings.HasPrefix(cleaned, "{") && strings.Count(cleaned, "{") > strings.Count(cleaned, "}") { + cleaned = cleaned[1:] + } + + if cleaned != payloadStr { + if err3 := json.Unmarshal([]byte(cleaned), &mqttMsg); err3 == nil { + content = mqttMsg.Status + logger.InfoCF("mqtt", "Successfully parsed cleaned JSON", map[string]any{ + "original": payloadStr, + "cleaned": cleaned, + }) + } else { + // Fall back to plain text + content = payloadStr + logger.InfoCF("mqtt", "Received plain text message (JSON parsing failed)", map[string]any{ + "error": err.Error(), + "payload": content, + }) + } + } else { + // Fall back to plain text + content = payloadStr + logger.InfoCF("mqtt", "Received plain text message (JSON parsing failed)", map[string]any{ + "error": err.Error(), + "payload": content, + }) + } + } + } else { + // Not JSON-like, treat as plain text + content = payloadStr + logger.InfoCF("mqtt", "Received plain text message (not JSON-like)", map[string]any{ + "payload": content, + }) + } + } + + if content == "" { + logger.WarnC("mqtt", "Empty content in MQTT message") + return + } + + // Determine sender ID + senderID := fmt.Sprintf("mqtt:%s", c.config.ClientID) + if msg.Topic() != "" { + senderID = fmt.Sprintf("mqtt:%s", strings.ReplaceAll(msg.Topic(), "/", "_")) + } + + // Check if message has reply-to header (in payload or topic) + replyTopic := c.config.ReplyTopic + if strings.Contains(content, "reply-to:") { + // Simple parsing for reply-to + parts := strings.SplitN(content, "reply-to:", 2) + if len(parts) == 2 { + replyTopic = strings.TrimSpace(parts[1]) + content = strings.TrimSpace(parts[0]) + } + } + + // Create peer and sender info + peer := bus.Peer{Kind: "direct", ID: msg.Topic()} // MQTT topics are like channels + sender := bus.SenderInfo{ + Platform: "mqtt", + PlatformID: senderID, + CanonicalID: senderID, + Username: senderID, + DisplayName: senderID, + } + + messageID := fmt.Sprintf("mqtt-%d", time.Now().UnixNano()) + + metadata := map[string]string{ + "platform": "mqtt", + "topic": msg.Topic(), + "reply_topic": replyTopic, + } + + if c.config.Instruction != "" { + content = c.config.Instruction + "\n\n" + content + } + + c.HandleMessage(c.ctx, peer, messageID, senderID, msg.Topic(), content, nil, metadata, sender) +} + +// Send sends a message to an MQTT topic. +func (c *MQTTChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { + if !c.IsRunning() { + return fmt.Errorf("MQTT channel is not running") + } + + replyTopic := c.config.ReplyTopic + + // If chatID contains reply topic info, use it + if strings.HasPrefix(msg.ChatID, "reply:") { + replyTopic = strings.TrimPrefix(msg.ChatID, "reply:") + } + + // Replace placeholders in reply topic + replyTopic = strings.ReplaceAll(replyTopic, "{client_id}", c.config.ClientID) + replyTopic = strings.ReplaceAll(replyTopic, "{topic}", msg.ChatID) + // Add more placeholders as needed + + mqttMsg := MQTTMessage{ + Status: msg.Content, + } + + payload, err := json.Marshal(mqttMsg) + if err != nil { + return fmt.Errorf("failed to marshal MQTT message: %w", err) + } + + token := c.client.Publish(replyTopic, byte(c.config.QoS), c.config.Retain, payload) + if token.Wait() && token.Error() != nil { + return fmt.Errorf("failed to publish MQTT message: %w", token.Error()) + } + + logger.DebugCF("mqtt", "Published message", map[string]any{ + "topic": replyTopic, + "payload": string(payload), + }) + return nil +} \ No newline at end of file diff --git a/pkg/config/config.go b/pkg/config/config.go index 235cb0641..f840e4d2f 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -299,6 +299,7 @@ type ChannelsConfig struct { Pico PicoConfig `json:"pico"` PicoClient PicoClientConfig `json:"pico_client"` IRC IRCConfig `json:"irc"` + MQTT MQTTConfig `json:"mqtt"` } // GroupTriggerConfig controls when the bot responds in group chats. @@ -541,6 +542,27 @@ type IRCConfig struct { ReasoningChannelID string `json:"reasoning_channel_id" env:"PICOCLAW_CHANNELS_IRC_REASONING_CHANNEL_ID"` } +type MQTTConfig struct { + Enabled bool `json:"enabled" env:"PICOCLAW_CHANNELS_MQTT_ENABLED"` + Broker string `json:"broker" env:"PICOCLAW_CHANNELS_MQTT_BROKER"` + ClientID string `json:"client_id" env:"PICOCLAW_CHANNELS_MQTT_CLIENT_ID"` + Username string `json:"username" env:"PICOCLAW_CHANNELS_MQTT_USERNAME"` + Password string `json:"password" env:"PICOCLAW_CHANNELS_MQTT_PASSWORD"` + SubscribeTopics []string `json:"subscribe_topics" env:"PICOCLAW_CHANNELS_MQTT_SUBSCRIBE_TOPICS"` + ReplyTopic string `json:"reply_topic" env:"PICOCLAW_CHANNELS_MQTT_REPLY_TOPIC"` + TLS bool `json:"tls" env:"PICOCLAW_CHANNELS_MQTT_TLS"` + TLSCA string `json:"tls_ca" env:"PICOCLAW_CHANNELS_MQTT_TLS_CA"` + TLSCert string `json:"tls_cert" env:"PICOCLAW_CHANNELS_MQTT_TLS_CERT"` + TLSKey string `json:"tls_key" env:"PICOCLAW_CHANNELS_MQTT_TLS_KEY"` + QoS int `json:"qos" env:"PICOCLAW_CHANNELS_MQTT_QOS"` + Retain bool `json:"retain" env:"PICOCLAW_CHANNELS_MQTT_RETAIN"` + Prefix string `json:"prefix" env:"PICOCLAW_CHANNELS_MQTT_PREFIX"` + Instruction string `json:"instruction" env:"PICOCLAW_CHANNELS_MQTT_INSTRUCTION"` + AllowFrom FlexibleStringSlice `json:"allow_from" env:"PICOCLAW_CHANNELS_MQTT_ALLOW_FROM"` + GroupTrigger GroupTriggerConfig `json:"group_trigger,omitempty"` + ReasoningChannelID string `json:"reasoning_channel_id" env:"PICOCLAW_CHANNELS_MQTT_REASONING_CHANNEL_ID"` +} + type HeartbeatConfig struct { Enabled bool `json:"enabled" env:"PICOCLAW_HEARTBEAT_ENABLED"` Interval int `json:"interval" env:"PICOCLAW_HEARTBEAT_INTERVAL"` // minutes, min 5 diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 9a2706b3b..d3ac2698d 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -27,6 +27,7 @@ import ( _ "github.com/sipeed/picoclaw/pkg/channels/slack" _ "github.com/sipeed/picoclaw/pkg/channels/telegram" _ "github.com/sipeed/picoclaw/pkg/channels/wecom" + _ "github.com/sipeed/picoclaw/pkg/channels/mqtt" _ "github.com/sipeed/picoclaw/pkg/channels/whatsapp" _ "github.com/sipeed/picoclaw/pkg/channels/whatsapp_native" "github.com/sipeed/picoclaw/pkg/config" From 53dc12c8e35cfe157907f975147a8d64b10066e3 Mon Sep 17 00:00:00 2001 From: avaksru Date: Tue, 17 Mar 2026 18:24:50 +0300 Subject: [PATCH 02/10] JSON format --- config/config.example.json | 2 + docs/channels/mqtt/README.md | 68 ++++++++++++++++++++- pkg/channels/mqtt/mqtt.go | 114 +++++++++++++---------------------- pkg/config/config.go | 2 + 4 files changed, 112 insertions(+), 74 deletions(-) diff --git a/config/config.example.json b/config/config.example.json index 43bbb9ad8..4868653c2 100644 --- a/config/config.example.json +++ b/config/config.example.json @@ -271,7 +271,9 @@ "subscribe_topics": [ "picoclaw/input" ], + "subscribe_json_key": null, "reply_topic": "picoclaw/output", + "reply_json_key": null, "tls": false, "tls_ca": "", "tls_cert": "", diff --git a/docs/channels/mqtt/README.md b/docs/channels/mqtt/README.md index 370c44ffa..a54eeeeba 100644 --- a/docs/channels/mqtt/README.md +++ b/docs/channels/mqtt/README.md @@ -16,8 +16,12 @@ Add this to `config.json`: "qos": 1, "retain": false, "tls": false, - "subscribe_topics": ["picoclaw/chat"], - "reply_topic": "picoclaw/reply", + "subscribe_topics": [ + "picoclaw/input" + ], + "subscribe_json_key": null, + "reply_topic": "picoclaw/output", + "reply_json_key": null, "allow_from": [], "group_trigger": { "mention_only": true @@ -42,7 +46,9 @@ Add this to `config.json`: | retain | bool | No | Whether to retain messages. Default: false | | tls | bool | No | Enable TLS/SSL connection. Default: false | | subscribe_topics | []string | Yes | List of MQTT topics to subscribe to for incoming messages | +| subscribe_json_key | string | No | JSON key to extract from incoming messages. If null, treats message as plain text | | reply_topic | string | No | Topic to publish replies to. Supports placeholders: `{client_id}`, `{topic}` | +| reply_json_key | string | No | JSON key to use when sending replies. If null, sends as plain text | | allow_from | []string | No | Client ID whitelist (empty allows all) | | group_trigger | object | No | Group trigger strategy (`mention_only` / `prefixes`) | | reasoning_channel_id | string | No | Target channel for reasoning output | @@ -54,6 +60,8 @@ Add this to `config.json`: - JSON format: `{"status": "your message"}` - Plain text: Direct text content - Automatic JSON parsing with fallback to plain text for malformed JSON +- **JSON Key Extraction**: When `subscribe_json_key` is set, extracts specific field from JSON messages +- **JSON Response Formatting**: When `reply_json_key` is set, sends responses as JSON with specified key - **Authentication**: Username/password authentication support - **TLS/SSL**: Secure connections with TLS configuration - **Quality of Service**: Configurable QoS levels (0, 1, 2) @@ -67,6 +75,7 @@ Add this to `config.json`: - **Robust Message Handling**: Intelligent parsing that handles malformed JSON gracefully - **Flexible Topic Configuration**: Support for multiple input topics and dynamic reply topics +- **JSON Message Processing**: Configurable JSON key extraction and response formatting - **Connection Resilience**: Automatic reconnection with configurable retry intervals - **Security**: TLS support and authentication for secure communication - **Message Routing**: Support for reasoning channel routing and group trigger rules @@ -77,4 +86,57 @@ Add this to `config.json`: - Reply topics can use placeholders to dynamically route responses - Client IDs are used as sender identifiers in the messaging system - Topics are treated as channels for message routing purposes -- The instruction field allows adding context or commands to all incoming messages \ No newline at end of file +- The instruction field allows adding context or commands to all incoming messages + +## 6. JSON Configuration Examples + +### Plain Text Mode (Default) +```json +{ + "subscribe_json_key": null, + "reply_json_key": null +} +``` +- Incoming messages are treated as plain text +- Outgoing messages are sent as plain text + +### JSON Input Mode +```json +{ + "subscribe_json_key": "message", + "reply_json_key": null +} +``` +- Incoming JSON: `{"message": "Hello world", "timestamp": 1234567890}` +- Extracted content: `"Hello world"` +- Outgoing messages are sent as plain text + +### JSON Output Mode +```json +{ + "subscribe_json_key": null, + "reply_json_key": "response" +} +``` +- Incoming messages are treated as plain text +- Outgoing JSON: `{"response": "Bot reply"}` + +### Full JSON Mode +```json +{ + "subscribe_json_key": "input", + "reply_json_key": "output" +} +``` +- Incoming JSON: `{"input": "What's the weather?", "location": "Moscow"}` +- Extracted content: `"What's the weather?"` +- Outgoing JSON: `{"output": "The weather is sunny"}` + +## 7. Troubleshooting + +### Common Issues + +1. **JSON parsing fails**: Ensure your JSON messages are valid +2. **Key not found**: Verify the JSON key exists in your messages +3. **Connection issues**: Check broker URL, credentials, and TLS settings +4. **Permission denied**: Verify client ID is in the `allow_from` list if configured diff --git a/pkg/channels/mqtt/mqtt.go b/pkg/channels/mqtt/mqtt.go index d6efe040d..77a131e8f 100644 --- a/pkg/channels/mqtt/mqtt.go +++ b/pkg/channels/mqtt/mqtt.go @@ -154,77 +154,40 @@ func (c *MQTTChannel) onMessage(client mqtt.Client, msg mqtt.Message) { "payload": string(msg.Payload()), }) - // Try to parse as JSON first - var mqttMsg MQTTMessage var content string - var err error - // First try to parse as JSON - if err = json.Unmarshal(msg.Payload(), &mqttMsg); err == nil { - // Successfully parsed as JSON - content = mqttMsg.Status - } else { - // If JSON parsing fails, try to clean up common malformed JSON issues - payloadStr := string(msg.Payload()) - - // Try to extract JSON from malformed strings (e.g., extra quotes or braces) - // Look for a valid JSON object within the string - if strings.HasPrefix(payloadStr, "{") && (strings.HasSuffix(payloadStr, "}") || strings.HasSuffix(payloadStr, "}\"")) { - // Try to parse as-is first - if err2 := json.Unmarshal(msg.Payload(), &mqttMsg); err2 == nil { - content = mqttMsg.Status + // Check if subscribe_json_key is configured + if c.config.SubscribeJSONKey != nil && *c.config.SubscribeJSONKey != "" { + // Parse as JSON and extract the specified key + var jsonMsg map[string]interface{} + if err := json.Unmarshal(msg.Payload(), &jsonMsg); err == nil { + // Successfully parsed as JSON + if value, exists := jsonMsg[*c.config.SubscribeJSONKey]; exists { + content = fmt.Sprintf("%v", value) + logger.InfoCF("mqtt", "Extracted JSON value", map[string]any{ + "key": *c.config.SubscribeJSONKey, + "content": content, + }) } else { - // Try to clean up common issues - cleaned := strings.TrimSpace(payloadStr) - - // Remove extra quotes and braces from the end - for strings.HasSuffix(cleaned, "}") && strings.Count(cleaned, "{") < strings.Count(cleaned, "}") { - cleaned = cleaned[:len(cleaned)-1] - } - for strings.HasSuffix(cleaned, "\"}") && strings.Count(cleaned, "{") < strings.Count(cleaned, "}") { - cleaned = cleaned[:len(cleaned)-1] - } - // Remove any trailing quotes (simple check for extra quotes at the end) - for strings.HasSuffix(cleaned, "\"") && !strings.HasSuffix(cleaned, "\"}") { - cleaned = cleaned[:len(cleaned)-1] - } - - // Remove extra opening braces - for strings.HasPrefix(cleaned, "{") && strings.Count(cleaned, "{") > strings.Count(cleaned, "}") { - cleaned = cleaned[1:] - } - - if cleaned != payloadStr { - if err3 := json.Unmarshal([]byte(cleaned), &mqttMsg); err3 == nil { - content = mqttMsg.Status - logger.InfoCF("mqtt", "Successfully parsed cleaned JSON", map[string]any{ - "original": payloadStr, - "cleaned": cleaned, - }) - } else { - // Fall back to plain text - content = payloadStr - logger.InfoCF("mqtt", "Received plain text message (JSON parsing failed)", map[string]any{ - "error": err.Error(), - "payload": content, - }) - } - } else { - // Fall back to plain text - content = payloadStr - logger.InfoCF("mqtt", "Received plain text message (JSON parsing failed)", map[string]any{ - "error": err.Error(), - "payload": content, - }) - } + logger.WarnCF("mqtt", "JSON key not found in message", map[string]any{ + "key": *c.config.SubscribeJSONKey, + }) + content = string(msg.Payload()) // Fall back to raw payload } } else { - // Not JSON-like, treat as plain text - content = payloadStr - logger.InfoCF("mqtt", "Received plain text message (not JSON-like)", map[string]any{ - "payload": content, + // JSON parsing failed, treat as plain text + logger.InfoCF("mqtt", "JSON parsing failed, treating as plain text", map[string]any{ + "error": err.Error(), + "payload": string(msg.Payload()), }) + content = string(msg.Payload()) } + } else { + // No JSON key configured, treat as plain text + content = string(msg.Payload()) + logger.InfoCF("mqtt", "Received plain text message", map[string]any{ + "payload": content, + }) } if content == "" { @@ -292,13 +255,22 @@ func (c *MQTTChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { replyTopic = strings.ReplaceAll(replyTopic, "{topic}", msg.ChatID) // Add more placeholders as needed - mqttMsg := MQTTMessage{ - Status: msg.Content, - } + var payload []byte + var err error - payload, err := json.Marshal(mqttMsg) - if err != nil { - return fmt.Errorf("failed to marshal MQTT message: %w", err) + // Check if reply_json_key is configured + if c.config.ReplyJSONKey != nil && *c.config.ReplyJSONKey != "" { + // Send as JSON with the specified key + jsonMsg := map[string]string{ + *c.config.ReplyJSONKey: msg.Content, + } + payload, err = json.Marshal(jsonMsg) + if err != nil { + return fmt.Errorf("failed to marshal MQTT JSON message: %w", err) + } + } else { + // Send as plain text + payload = []byte(msg.Content) } token := c.client.Publish(replyTopic, byte(c.config.QoS), c.config.Retain, payload) @@ -311,4 +283,4 @@ func (c *MQTTChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { "payload": string(payload), }) return nil -} \ No newline at end of file +} diff --git a/pkg/config/config.go b/pkg/config/config.go index f840e4d2f..1d5a864fa 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -549,7 +549,9 @@ type MQTTConfig struct { Username string `json:"username" env:"PICOCLAW_CHANNELS_MQTT_USERNAME"` Password string `json:"password" env:"PICOCLAW_CHANNELS_MQTT_PASSWORD"` SubscribeTopics []string `json:"subscribe_topics" env:"PICOCLAW_CHANNELS_MQTT_SUBSCRIBE_TOPICS"` + SubscribeJSONKey *string `json:"subscribe_json_key,omitempty"` ReplyTopic string `json:"reply_topic" env:"PICOCLAW_CHANNELS_MQTT_REPLY_TOPIC"` + ReplyJSONKey *string `json:"reply_json_key,omitempty"` TLS bool `json:"tls" env:"PICOCLAW_CHANNELS_MQTT_TLS"` TLSCA string `json:"tls_ca" env:"PICOCLAW_CHANNELS_MQTT_TLS_CA"` TLSCert string `json:"tls_cert" env:"PICOCLAW_CHANNELS_MQTT_TLS_CERT"` From 858146213c8319cb90c40602981d8433956dfcf5 Mon Sep 17 00:00:00 2001 From: avaksru Date: Tue, 17 Mar 2026 18:36:45 +0300 Subject: [PATCH 03/10] mqtt reconnect --- pkg/channels/mqtt/mqtt.go | 76 +++++++++++++++++++++++++++++++-------- 1 file changed, 62 insertions(+), 14 deletions(-) diff --git a/pkg/channels/mqtt/mqtt.go b/pkg/channels/mqtt/mqtt.go index 77a131e8f..4eece637c 100644 --- a/pkg/channels/mqtt/mqtt.go +++ b/pkg/channels/mqtt/mqtt.go @@ -99,6 +99,33 @@ func (c *MQTTChannel) Start(ctx context.Context) error { // Set message handler opts.SetDefaultPublishHandler(c.onMessage) + // Set connection lost handler + opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { + logger.ErrorCF("mqtt", "Connection lost", map[string]any{ + "error": err, + }) + // Connection will be automatically reconnected by the client + }) + + // Set connect handler + opts.SetOnConnectHandler(func(client mqtt.Client) { + logger.InfoC("mqtt", "Connected to MQTT broker") + + // Subscribe to topics after successful connection + for _, topic := range c.config.SubscribeTopics { + if token := c.client.Subscribe(topic, byte(c.config.QoS), nil); token.Wait() && token.Error() != nil { + logger.ErrorCF("mqtt", "Failed to subscribe to topic", map[string]any{ + "topic": topic, + "error": token.Error(), + }) + } else { + logger.InfoCF("mqtt", "Subscribed to topic", map[string]any{ + "topic": topic, + }) + } + } + }) + c.client = mqtt.NewClient(opts) // Connect with retry @@ -111,22 +138,12 @@ func (c *MQTTChannel) Start(ctx context.Context) error { "client_id": c.config.ClientID, }) - // Subscribe to topics - for _, topic := range c.config.SubscribeTopics { - if token := c.client.Subscribe(topic, byte(c.config.QoS), nil); token.Wait() && token.Error() != nil { - logger.ErrorCF("mqtt", "Failed to subscribe to topic", map[string]any{ - "topic": topic, - "error": token.Error(), - }) - } else { - logger.InfoCF("mqtt", "Subscribed to topic", map[string]any{ - "topic": topic, - }) - } - } - c.SetRunning(true) logger.InfoC("mqtt", "MQTT channel started") + + // Start periodic connection health check + go c.startHealthCheck() + return nil } @@ -147,6 +164,37 @@ func (c *MQTTChannel) Stop(ctx context.Context) error { return nil } +// startHealthCheck starts a periodic health check for the MQTT connection. +func (c *MQTTChannel) startHealthCheck() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + if c.client != nil && c.client.IsConnected() { + // Connection is healthy + continue + } + + // Connection is lost, attempt to reconnect + logger.WarnC("mqtt", "MQTT connection lost, attempting to reconnect") + + // Try to reconnect + if token := c.client.Connect(); token.Wait() && token.Error() != nil { + logger.ErrorCF("mqtt", "Failed to reconnect to MQTT broker", map[string]any{ + "error": token.Error(), + }) + // Continue the loop to try again later + } else { + logger.InfoC("mqtt", "Successfully reconnected to MQTT broker") + } + } + } +} + // onMessage handles incoming MQTT messages. func (c *MQTTChannel) onMessage(client mqtt.Client, msg mqtt.Message) { logger.DebugCF("mqtt", "Received message", map[string]any{ From 3a96ea02b0afd2e598b707a48a79cd8847f8161e Mon Sep 17 00:00:00 2001 From: avaksru Date: Wed, 18 Mar 2026 12:54:31 +0300 Subject: [PATCH 04/10] removed redundant Health Check --- pkg/channels/mqtt/mqtt.go | 35 +---------------------------------- 1 file changed, 1 insertion(+), 34 deletions(-) diff --git a/pkg/channels/mqtt/mqtt.go b/pkg/channels/mqtt/mqtt.go index 4eece637c..3d1cec8b1 100644 --- a/pkg/channels/mqtt/mqtt.go +++ b/pkg/channels/mqtt/mqtt.go @@ -74,7 +74,7 @@ func (c *MQTTChannel) Start(ctx context.Context) error { opts.SetAutoReconnect(true) opts.SetConnectRetry(true) opts.SetConnectRetryInterval(5 * time.Second) - opts.SetMaxReconnectInterval(5 * time.Minute) + opts.SetMaxReconnectInterval(1 * time.Minute) opts.SetKeepAlive(60 * time.Second) opts.SetPingTimeout(10 * time.Second) opts.SetWriteTimeout(10 * time.Second) @@ -141,9 +141,6 @@ func (c *MQTTChannel) Start(ctx context.Context) error { c.SetRunning(true) logger.InfoC("mqtt", "MQTT channel started") - // Start periodic connection health check - go c.startHealthCheck() - return nil } @@ -164,36 +161,6 @@ func (c *MQTTChannel) Stop(ctx context.Context) error { return nil } -// startHealthCheck starts a periodic health check for the MQTT connection. -func (c *MQTTChannel) startHealthCheck() { - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - - for { - select { - case <-c.ctx.Done(): - return - case <-ticker.C: - if c.client != nil && c.client.IsConnected() { - // Connection is healthy - continue - } - - // Connection is lost, attempt to reconnect - logger.WarnC("mqtt", "MQTT connection lost, attempting to reconnect") - - // Try to reconnect - if token := c.client.Connect(); token.Wait() && token.Error() != nil { - logger.ErrorCF("mqtt", "Failed to reconnect to MQTT broker", map[string]any{ - "error": token.Error(), - }) - // Continue the loop to try again later - } else { - logger.InfoC("mqtt", "Successfully reconnected to MQTT broker") - } - } - } -} // onMessage handles incoming MQTT messages. func (c *MQTTChannel) onMessage(client mqtt.Client, msg mqtt.Message) { From d96d8810339115c4b314140dfe2685298281ef12 Mon Sep 17 00:00:00 2001 From: avaksru Date: Wed, 18 Mar 2026 13:04:37 +0300 Subject: [PATCH 05/10] certificate loading --- pkg/channels/mqtt/mqtt.go | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/pkg/channels/mqtt/mqtt.go b/pkg/channels/mqtt/mqtt.go index 3d1cec8b1..4c770bed5 100644 --- a/pkg/channels/mqtt/mqtt.go +++ b/pkg/channels/mqtt/mqtt.go @@ -3,8 +3,10 @@ package mqtt import ( "context" "crypto/tls" + "crypto/x509" "encoding/json" "fmt" + "os" "strings" "time" @@ -85,14 +87,29 @@ func (c *MQTTChannel) Start(ctx context.Context) error { tlsConfig := &tls.Config{ InsecureSkipVerify: false, } + + // Load CA certificate if provided if c.config.TLSCA != "" { - // Load CA cert if provided - // For simplicity, assuming file path - // In production, load cert properly + caCert, err := os.ReadFile(c.config.TLSCA) + if err != nil { + return fmt.Errorf("failed to read CA certificate: %w", err) + } + caCertPool := x509.NewCertPool() + if !caCertPool.AppendCertsFromPEM(caCert) { + return fmt.Errorf("failed to parse CA certificate") + } + tlsConfig.RootCAs = caCertPool } + + // Load client certificate and key if provided if c.config.TLSCert != "" && c.config.TLSKey != "" { - // Load client cert + cert, err := tls.LoadX509KeyPair(c.config.TLSCert, c.config.TLSKey) + if err != nil { + return fmt.Errorf("failed to load client certificate and key: %w", err) + } + tlsConfig.Certificates = []tls.Certificate{cert} } + opts.SetTLSConfig(tlsConfig) } From 28d9bd852dea6aac07df418cd31d918a4c3aeb65 Mon Sep 17 00:00:00 2001 From: avaksru Date: Wed, 18 Mar 2026 13:21:35 +0300 Subject: [PATCH 06/10] safety fix --- pkg/channels/mqtt/mqtt.go | 37 ++++++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/pkg/channels/mqtt/mqtt.go b/pkg/channels/mqtt/mqtt.go index 4c770bed5..08809d16e 100644 --- a/pkg/channels/mqtt/mqtt.go +++ b/pkg/channels/mqtt/mqtt.go @@ -129,8 +129,11 @@ func (c *MQTTChannel) Start(ctx context.Context) error { logger.InfoC("mqtt", "Connected to MQTT broker") // Subscribe to topics after successful connection + var subscriptionErrors []string for _, topic := range c.config.SubscribeTopics { if token := c.client.Subscribe(topic, byte(c.config.QoS), nil); token.Wait() && token.Error() != nil { + errMsg := fmt.Sprintf("topic %s: %v", topic, token.Error()) + subscriptionErrors = append(subscriptionErrors, errMsg) logger.ErrorCF("mqtt", "Failed to subscribe to topic", map[string]any{ "topic": topic, "error": token.Error(), @@ -141,12 +144,22 @@ func (c *MQTTChannel) Start(ctx context.Context) error { }) } } + + // Log subscription errors summary + if len(subscriptionErrors) > 0 { + logger.ErrorCF("mqtt", "Subscription errors occurred", map[string]any{ + "errors": subscriptionErrors, + "count": len(subscriptionErrors), + }) + } }) c.client = mqtt.NewClient(opts) // Connect with retry if token := c.client.Connect(); token.Wait() && token.Error() != nil { + // Clean up client on connection failure + c.client = nil return fmt.Errorf("mqtt connect failed: %w", token.Error()) } @@ -236,11 +249,18 @@ func (c *MQTTChannel) onMessage(client mqtt.Client, msg mqtt.Message) { // Check if message has reply-to header (in payload or topic) replyTopic := c.config.ReplyTopic if strings.Contains(content, "reply-to:") { - // Simple parsing for reply-to - parts := strings.SplitN(content, "reply-to:", 2) - if len(parts) == 2 { - replyTopic = strings.TrimSpace(parts[1]) - content = strings.TrimSpace(parts[0]) + // Improved parsing for reply-to: extract the last occurrence + lastIndex := strings.LastIndex(content, "reply-to:") + if lastIndex != -1 { + replyTopicPart := content[lastIndex+len("reply-to:"):] + // Find the end of the reply-to value (newline or end of string) + if newlineIndex := strings.Index(replyTopicPart, "\n"); newlineIndex != -1 { + replyTopic = strings.TrimSpace(replyTopicPart[:newlineIndex]) + content = strings.TrimSpace(content[:lastIndex]) + strings.TrimSpace(replyTopicPart[newlineIndex:]) + } else { + replyTopic = strings.TrimSpace(replyTopicPart) + content = strings.TrimSpace(content[:lastIndex]) + } } } @@ -285,8 +305,15 @@ func (c *MQTTChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { // Replace placeholders in reply topic replyTopic = strings.ReplaceAll(replyTopic, "{client_id}", c.config.ClientID) replyTopic = strings.ReplaceAll(replyTopic, "{topic}", msg.ChatID) + replyTopic = strings.ReplaceAll(replyTopic, "{timestamp}", fmt.Sprintf("%d", time.Now().Unix())) + replyTopic = strings.ReplaceAll(replyTopic, "{message_id}", msg.ReplyToMessageID) // Add more placeholders as needed + // Validate reply topic + if replyTopic == "" { + return fmt.Errorf("reply topic is empty and no default configured") + } + var payload []byte var err error From 132f4b315400d86827b5dfcbf6447415a8cd3c7c Mon Sep 17 00:00:00 2001 From: avaksru Date: Thu, 19 Mar 2026 10:18:35 +0300 Subject: [PATCH 07/10] paho v1.5.1 --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 1376ae687..2b99d9fc1 100644 --- a/go.mod +++ b/go.mod @@ -9,8 +9,8 @@ require ( github.com/anthropics/anthropic-sdk-go v1.26.0 github.com/bwmarrin/discordgo v0.29.0 github.com/caarlos0/env/v11 v11.4.0 + github.com/eclipse/paho.mqtt.golang v1.5.1 github.com/ergochat/irc-go v0.6.0 - github.com/eclipse/paho.mqtt.golang v1.4.3 github.com/ergochat/readline v0.1.3 github.com/gdamore/tcell/v2 v2.13.8 github.com/gomarkdown/markdown v0.0.0-20260217112301-37c66b85d6ab diff --git a/go.sum b/go.sum index cb338e77e..9ac3f9f51 100644 --- a/go.sum +++ b/go.sum @@ -46,8 +46,8 @@ github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= -github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= -github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= +github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE= +github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU= github.com/elliotchance/orderedmap/v3 v3.1.0 h1:j4DJ5ObEmMBt/lcwIecKcoRxIQUEnw0L804lXYDt/pg= github.com/elliotchance/orderedmap/v3 v3.1.0/go.mod h1:G+Hc2RwaZvJMcS4JpGCOyViCnGeKf0bTYCGTO4uhjSo= github.com/ergochat/irc-go v0.6.0 h1:Y0AGV76aeihJfCtLaQh+OyJKFiKGrYC0VTkeMZ6XW28= From 5097afd079f3785642d0a5fbd2c7ace560e99f2f Mon Sep 17 00:00:00 2001 From: avaksru Date: Thu, 19 Mar 2026 10:28:05 +0300 Subject: [PATCH 08/10] gci-fmt --- go.mod | 7 - pkg/channels/manager.go | 53 ++++- pkg/channels/mqtt/init.go | 9 +- pkg/channels/mqtt/mqtt.go | 24 +- pkg/config/config.go | 76 ++++--- pkg/gateway/gateway.go | 448 ++++++++++++++++++++++++++++++++++++-- 6 files changed, 535 insertions(+), 82 deletions(-) diff --git a/go.mod b/go.mod index 2b99d9fc1..65c561d91 100644 --- a/go.mod +++ b/go.mod @@ -94,15 +94,8 @@ require ( github.com/valyala/fastjson v1.6.10 // indirect github.com/yosida95/uritemplate/v3 v3.0.2 // indirect golang.org/x/arch v0.24.0 // indirect -<<<<<<< HEAD golang.org/x/crypto v0.49.0 golang.org/x/net v0.52.0 golang.org/x/sync v0.20.0 // indirect golang.org/x/sys v0.42.0 // indirect -======= - golang.org/x/crypto v0.48.0 - golang.org/x/net v0.51.0 - golang.org/x/sync v0.19.0 // indirect - golang.org/x/sys v0.41.0 // indirect ->>>>>>> 1038a05 (channel mqtt) ) diff --git a/pkg/channels/manager.go b/pkg/channels/manager.go index 7a1e090e8..8e197e1de 100644 --- a/pkg/channels/manager.go +++ b/pkg/channels/manager.go @@ -158,8 +158,13 @@ func (m *Manager) RecordReactionUndo(channel, chatID string, undo func()) { } // preSend handles typing stop, reaction undo, and placeholder editing before sending a message. -// Returns true if the message was already delivered (skip Send). -func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMessage, ch Channel) bool { +// Returns true if the message was edited into a placeholder (skip Send). +func (m *Manager) preSend( + ctx context.Context, + name string, + msg bus.OutboundMessage, + ch Channel, +) bool { key := name + ":" + msg.ChatID // 1. Stop typing @@ -206,7 +211,11 @@ func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMess return false } -func NewManager(cfg *config.Config, messageBus *bus.MessageBus, store media.MediaStore) (*Manager, error) { +func NewManager( + cfg *config.Config, + messageBus *bus.MessageBus, + store media.MediaStore, +) (*Manager, error) { m := &Manager{ channels: make(map[string]Channel), workers: make(map[string]*channelWorker), @@ -397,7 +406,9 @@ func (m *Manager) initChannels(channels *config.ChannelsConfig) error { m.initChannel("irc", "IRC") } - if m.config.Channels.MQTT.Enabled && m.config.Channels.MQTT.Broker != "" && m.config.Channels.MQTT.ClientID != "" && len(m.config.Channels.MQTT.SubscribeTopics) > 0 { + if m.config.Channels.MQTT.Enabled && m.config.Channels.MQTT.Broker != "" && + m.config.Channels.MQTT.ClientID != "" && + len(m.config.Channels.MQTT.SubscribeTopics) > 0 { m.initChannel("mqtt", "MQTT") } @@ -619,7 +630,12 @@ func (m *Manager) runWorker(ctx context.Context, name string, w *channelWorker) // - ErrNotRunning / ErrSendFailed: permanent, no retry // - ErrRateLimit: fixed delay retry // - ErrTemporary / unknown: exponential backoff retry -func (m *Manager) sendWithRetry(ctx context.Context, name string, w *channelWorker, msg bus.OutboundMessage) { +func (m *Manager) sendWithRetry( + ctx context.Context, + name string, + w *channelWorker, + msg bus.OutboundMessage, +) { // Rate limit: wait for token if err := w.limiter.Wait(ctx); err != nil { // ctx canceled, shutting down @@ -659,7 +675,10 @@ func (m *Manager) sendWithRetry(ctx context.Context, name string, w *channelWork } // ErrTemporary or unknown error — exponential backoff - backoff := min(time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))), maxBackoff) + backoff := min( + time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))), + maxBackoff, + ) select { case <-time.After(backoff): case <-ctx.Done(): @@ -784,12 +803,21 @@ func (m *Manager) runMediaWorker(ctx context.Context, name string, w *channelWor // sendMediaWithRetry sends a media message through the channel with rate limiting and // retry logic. If the channel does not implement MediaSender, it silently skips. -func (m *Manager) sendMediaWithRetry(ctx context.Context, name string, w *channelWorker, msg bus.OutboundMediaMessage) { +func (m *Manager) sendMediaWithRetry( + ctx context.Context, + name string, + w *channelWorker, + msg bus.OutboundMediaMessage, +) { ms, ok := w.ch.(MediaSender) if !ok { - logger.DebugCF("channels", "Channel does not support MediaSender, skipping media", map[string]any{ - "channel": name, - }) + logger.DebugCF( + "channels", + "Channel does not support MediaSender, skipping media", + map[string]any{ + "channel": name, + }, + ) return } @@ -826,7 +854,10 @@ func (m *Manager) sendMediaWithRetry(ctx context.Context, name string, w *channe } // ErrTemporary or unknown error — exponential backoff - backoff := min(time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))), maxBackoff) + backoff := min( + time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))), + maxBackoff, + ) select { case <-time.After(backoff): case <-ctx.Done(): diff --git a/pkg/channels/mqtt/init.go b/pkg/channels/mqtt/init.go index c5f77713f..ef0acf181 100644 --- a/pkg/channels/mqtt/init.go +++ b/pkg/channels/mqtt/init.go @@ -7,10 +7,17 @@ import ( ) func init() { + channels.RegisterFactory("mqtt", func(cfg *config.Config, b *bus.MessageBus) (channels.Channel, error) { + if !cfg.Channels.MQTT.Enabled { + return nil, nil + } + return NewMQTTChannel(cfg.Channels.MQTT, b) + }) -} \ No newline at end of file + +} diff --git a/pkg/channels/mqtt/mqtt.go b/pkg/channels/mqtt/mqtt.go index 08809d16e..fdaaabd90 100644 --- a/pkg/channels/mqtt/mqtt.go +++ b/pkg/channels/mqtt/mqtt.go @@ -11,7 +11,6 @@ import ( "time" mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/channels" "github.com/sipeed/picoclaw/pkg/config" @@ -87,7 +86,7 @@ func (c *MQTTChannel) Start(ctx context.Context) error { tlsConfig := &tls.Config{ InsecureSkipVerify: false, } - + // Load CA certificate if provided if c.config.TLSCA != "" { caCert, err := os.ReadFile(c.config.TLSCA) @@ -100,7 +99,7 @@ func (c *MQTTChannel) Start(ctx context.Context) error { } tlsConfig.RootCAs = caCertPool } - + // Load client certificate and key if provided if c.config.TLSCert != "" && c.config.TLSKey != "" { cert, err := tls.LoadX509KeyPair(c.config.TLSCert, c.config.TLSKey) @@ -109,7 +108,7 @@ func (c *MQTTChannel) Start(ctx context.Context) error { } tlsConfig.Certificates = []tls.Certificate{cert} } - + opts.SetTLSConfig(tlsConfig) } @@ -127,7 +126,7 @@ func (c *MQTTChannel) Start(ctx context.Context) error { // Set connect handler opts.SetOnConnectHandler(func(client mqtt.Client) { logger.InfoC("mqtt", "Connected to MQTT broker") - + // Subscribe to topics after successful connection var subscriptionErrors []string for _, topic := range c.config.SubscribeTopics { @@ -144,7 +143,7 @@ func (c *MQTTChannel) Start(ctx context.Context) error { }) } } - + // Log subscription errors summary if len(subscriptionErrors) > 0 { logger.ErrorCF("mqtt", "Subscription errors occurred", map[string]any{ @@ -164,13 +163,13 @@ func (c *MQTTChannel) Start(ctx context.Context) error { } logger.InfoCF("mqtt", "Connected to MQTT broker", map[string]any{ - "broker": c.config.Broker, + "broker": c.config.Broker, "client_id": c.config.ClientID, }) c.SetRunning(true) logger.InfoC("mqtt", "MQTT channel started") - + return nil } @@ -191,7 +190,6 @@ func (c *MQTTChannel) Stop(ctx context.Context) error { return nil } - // onMessage handles incoming MQTT messages. func (c *MQTTChannel) onMessage(client mqtt.Client, msg mqtt.Message) { logger.DebugCF("mqtt", "Received message", map[string]any{ @@ -200,7 +198,7 @@ func (c *MQTTChannel) onMessage(client mqtt.Client, msg mqtt.Message) { }) var content string - + // Check if subscribe_json_key is configured if c.config.SubscribeJSONKey != nil && *c.config.SubscribeJSONKey != "" { // Parse as JSON and extract the specified key @@ -277,8 +275,8 @@ func (c *MQTTChannel) onMessage(client mqtt.Client, msg mqtt.Message) { messageID := fmt.Sprintf("mqtt-%d", time.Now().UnixNano()) metadata := map[string]string{ - "platform": "mqtt", - "topic": msg.Topic(), + "platform": "mqtt", + "topic": msg.Topic(), "reply_topic": replyTopic, } @@ -313,7 +311,7 @@ func (c *MQTTChannel) Send(ctx context.Context, msg bus.OutboundMessage) error { if replyTopic == "" { return fmt.Errorf("reply topic is empty and no default configured") } - + var payload []byte var err error diff --git a/pkg/config/config.go b/pkg/config/config.go index 1d5a864fa..babbe0ef0 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -543,26 +543,26 @@ type IRCConfig struct { } type MQTTConfig struct { - Enabled bool `json:"enabled" env:"PICOCLAW_CHANNELS_MQTT_ENABLED"` - Broker string `json:"broker" env:"PICOCLAW_CHANNELS_MQTT_BROKER"` - ClientID string `json:"client_id" env:"PICOCLAW_CHANNELS_MQTT_CLIENT_ID"` - Username string `json:"username" env:"PICOCLAW_CHANNELS_MQTT_USERNAME"` - Password string `json:"password" env:"PICOCLAW_CHANNELS_MQTT_PASSWORD"` - SubscribeTopics []string `json:"subscribe_topics" env:"PICOCLAW_CHANNELS_MQTT_SUBSCRIBE_TOPICS"` + Enabled bool `json:"enabled" env:"PICOCLAW_CHANNELS_MQTT_ENABLED"` + Broker string `json:"broker" env:"PICOCLAW_CHANNELS_MQTT_BROKER"` + ClientID string `json:"client_id" env:"PICOCLAW_CHANNELS_MQTT_CLIENT_ID"` + Username string `json:"username" env:"PICOCLAW_CHANNELS_MQTT_USERNAME"` + Password string `json:"password" env:"PICOCLAW_CHANNELS_MQTT_PASSWORD"` + SubscribeTopics []string `json:"subscribe_topics" env:"PICOCLAW_CHANNELS_MQTT_SUBSCRIBE_TOPICS"` SubscribeJSONKey *string `json:"subscribe_json_key,omitempty"` - ReplyTopic string `json:"reply_topic" env:"PICOCLAW_CHANNELS_MQTT_REPLY_TOPIC"` + ReplyTopic string `json:"reply_topic" env:"PICOCLAW_CHANNELS_MQTT_REPLY_TOPIC"` ReplyJSONKey *string `json:"reply_json_key,omitempty"` - TLS bool `json:"tls" env:"PICOCLAW_CHANNELS_MQTT_TLS"` - TLSCA string `json:"tls_ca" env:"PICOCLAW_CHANNELS_MQTT_TLS_CA"` - TLSCert string `json:"tls_cert" env:"PICOCLAW_CHANNELS_MQTT_TLS_CERT"` - TLSKey string `json:"tls_key" env:"PICOCLAW_CHANNELS_MQTT_TLS_KEY"` - QoS int `json:"qos" env:"PICOCLAW_CHANNELS_MQTT_QOS"` - Retain bool `json:"retain" env:"PICOCLAW_CHANNELS_MQTT_RETAIN"` - Prefix string `json:"prefix" env:"PICOCLAW_CHANNELS_MQTT_PREFIX"` - Instruction string `json:"instruction" env:"PICOCLAW_CHANNELS_MQTT_INSTRUCTION"` - AllowFrom FlexibleStringSlice `json:"allow_from" env:"PICOCLAW_CHANNELS_MQTT_ALLOW_FROM"` + TLS bool `json:"tls" env:"PICOCLAW_CHANNELS_MQTT_TLS"` + TLSCA string `json:"tls_ca" env:"PICOCLAW_CHANNELS_MQTT_TLS_CA"` + TLSCert string `json:"tls_cert" env:"PICOCLAW_CHANNELS_MQTT_TLS_CERT"` + TLSKey string `json:"tls_key" env:"PICOCLAW_CHANNELS_MQTT_TLS_KEY"` + QoS int `json:"qos" env:"PICOCLAW_CHANNELS_MQTT_QOS"` + Retain bool `json:"retain" env:"PICOCLAW_CHANNELS_MQTT_RETAIN"` + Prefix string `json:"prefix" env:"PICOCLAW_CHANNELS_MQTT_PREFIX"` + Instruction string `json:"instruction" env:"PICOCLAW_CHANNELS_MQTT_INSTRUCTION"` + AllowFrom FlexibleStringSlice `json:"allow_from" env:"PICOCLAW_CHANNELS_MQTT_ALLOW_FROM"` GroupTrigger GroupTriggerConfig `json:"group_trigger,omitempty"` - ReasoningChannelID string `json:"reasoning_channel_id" env:"PICOCLAW_CHANNELS_MQTT_REASONING_CHANNEL_ID"` + ReasoningChannelID string `json:"reasoning_channel_id" env:"PICOCLAW_CHANNELS_MQTT_REASONING_CHANNEL_ID"` } type HeartbeatConfig struct { @@ -755,9 +755,9 @@ type SearXNGConfig struct { } type GLMSearchConfig struct { - Enabled bool `json:"enabled" env:"PICOCLAW_TOOLS_WEB_GLM_ENABLED"` - APIKey string `json:"api_key" env:"PICOCLAW_TOOLS_WEB_GLM_API_KEY"` - BaseURL string `json:"base_url" env:"PICOCLAW_TOOLS_WEB_GLM_BASE_URL"` + Enabled bool `json:"enabled" env:"PICOCLAW_TOOLS_WEB_GLM_ENABLED"` + APIKey string `json:"api_key" env:"PICOCLAW_TOOLS_WEB_GLM_API_KEY"` + BaseURL string `json:"base_url" env:"PICOCLAW_TOOLS_WEB_GLM_BASE_URL"` // SearchEngine specifies the search backend: "search_std" (default), // "search_pro", "search_pro_sogou", or "search_pro_quark". SearchEngine string `json:"search_engine" env:"PICOCLAW_TOOLS_WEB_GLM_SEARCH_ENGINE"` @@ -765,7 +765,7 @@ type GLMSearchConfig struct { } type WebToolsConfig struct { - ToolConfig ` envPrefix:"PICOCLAW_TOOLS_WEB_"` + ToolConfig ` envPrefix:"PICOCLAW_TOOLS_WEB_"` Brave BraveConfig ` json:"brave"` Tavily TavilyConfig ` json:"tavily"` DuckDuckGo DuckDuckGoConfig ` json:"duckduckgo"` @@ -777,13 +777,13 @@ type WebToolsConfig struct { // the client-side web_search tool is hidden to avoid duplicate search surfaces, // and the provider's built-in search is used instead. Falls back to client-side // search when the provider does not support native search. - PreferNative bool `json:"prefer_native" env:"PICOCLAW_TOOLS_WEB_PREFER_NATIVE"` + PreferNative bool ` json:"prefer_native" env:"PICOCLAW_TOOLS_WEB_PREFER_NATIVE"` // Proxy is an optional proxy URL for web tools (http/https/socks5/socks5h). // For authenticated proxies, prefer HTTP_PROXY/HTTPS_PROXY env vars instead of embedding credentials in config. - Proxy string `json:"proxy,omitempty" env:"PICOCLAW_TOOLS_WEB_PROXY"` - FetchLimitBytes int64 `json:"fetch_limit_bytes,omitempty" env:"PICOCLAW_TOOLS_WEB_FETCH_LIMIT_BYTES"` - Format string `json:"format,omitempty" env:"PICOCLAW_TOOLS_WEB_FORMAT"` - PrivateHostWhitelist FlexibleStringSlice `json:"private_host_whitelist,omitempty" env:"PICOCLAW_TOOLS_WEB_PRIVATE_HOST_WHITELIST"` + Proxy string ` json:"proxy,omitempty" env:"PICOCLAW_TOOLS_WEB_PROXY"` + FetchLimitBytes int64 ` json:"fetch_limit_bytes,omitempty" env:"PICOCLAW_TOOLS_WEB_FETCH_LIMIT_BYTES"` + Format string ` json:"format,omitempty" env:"PICOCLAW_TOOLS_WEB_FORMAT"` + PrivateHostWhitelist FlexibleStringSlice ` json:"private_host_whitelist,omitempty" env:"PICOCLAW_TOOLS_WEB_PRIVATE_HOST_WHITELIST"` } type CronToolsConfig struct { @@ -898,10 +898,10 @@ type MCPServerConfig struct { // MCPConfig defines configuration for all MCP servers type MCPConfig struct { - ToolConfig ` envPrefix:"PICOCLAW_TOOLS_MCP_"` + ToolConfig ` envPrefix:"PICOCLAW_TOOLS_MCP_"` Discovery ToolDiscoveryConfig ` json:"discovery"` // Servers is a map of server name to server configuration - Servers map[string]MCPServerConfig `json:"servers,omitempty"` + Servers map[string]MCPServerConfig ` json:"servers,omitempty"` } func LoadConfig(path string) (*Config, error) { @@ -935,10 +935,13 @@ func LoadConfig(path string) (*Config, error) { if passphrase := credential.PassphraseProvider(); passphrase != "" { for _, m := range cfg.ModelList { - if m.APIKey != "" && !strings.HasPrefix(m.APIKey, "enc://") && !strings.HasPrefix(m.APIKey, "file://") { - fmt.Fprintf(os.Stderr, + if m.APIKey != "" && !strings.HasPrefix(m.APIKey, "enc://") && + !strings.HasPrefix(m.APIKey, "file://") { + fmt.Fprintf( + os.Stderr, "picoclaw: warning: model %q has a plaintext api_key; call SaveConfig to encrypt it\n", - m.ModelName) + m.ModelName, + ) } } } @@ -991,7 +994,8 @@ func encryptPlaintextAPIKeys(models []ModelConfig, passphrase string) ([]ModelCo changed := false for i := range sealed { m := &sealed[i] - if m.APIKey == "" || strings.HasPrefix(m.APIKey, "enc://") || strings.HasPrefix(m.APIKey, "file://") { + if m.APIKey == "" || strings.HasPrefix(m.APIKey, "enc://") || + strings.HasPrefix(m.APIKey, "file://") { continue } encrypted, err := credential.Encrypt(passphrase, "", m.APIKey) @@ -1024,7 +1028,13 @@ func resolveAPIKeys(models []ModelConfig, configDir string) error { for j, key := range models[i].APIKeys { resolved, err := cr.Resolve(key) if err != nil { - return fmt.Errorf("model_list[%d] (%s): api_keys[%d]: %w", i, models[i].ModelName, j, err) + return fmt.Errorf( + "model_list[%d] (%s): api_keys[%d]: %w", + i, + models[i].ModelName, + j, + err, + ) } models[i].APIKeys[j] = resolved } diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index d3ac2698d..075666e3e 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -21,13 +21,13 @@ import ( _ "github.com/sipeed/picoclaw/pkg/channels/line" _ "github.com/sipeed/picoclaw/pkg/channels/maixcam" _ "github.com/sipeed/picoclaw/pkg/channels/matrix" + _ "github.com/sipeed/picoclaw/pkg/channels/mqtt" _ "github.com/sipeed/picoclaw/pkg/channels/onebot" _ "github.com/sipeed/picoclaw/pkg/channels/pico" _ "github.com/sipeed/picoclaw/pkg/channels/qq" _ "github.com/sipeed/picoclaw/pkg/channels/slack" _ "github.com/sipeed/picoclaw/pkg/channels/telegram" _ "github.com/sipeed/picoclaw/pkg/channels/wecom" - _ "github.com/sipeed/picoclaw/pkg/channels/mqtt" _ "github.com/sipeed/picoclaw/pkg/channels/whatsapp" _ "github.com/sipeed/picoclaw/pkg/channels/whatsapp_native" "github.com/sipeed/picoclaw/pkg/config" @@ -44,20 +44,29 @@ import ( ) const ( - serviceShutdownTimeout = 30 * time.Second - providerReloadTimeout = 30 * time.Second + serviceShutdownTimeout = 30 * time.Second + + providerReloadTimeout = 30 * time.Second + gracefulShutdownTimeout = 15 * time.Second ) type services struct { - CronService *cron.CronService + CronService *cron.CronService + HeartbeatService *heartbeat.HeartbeatService - MediaStore media.MediaStore - ChannelManager *channels.Manager - DeviceService *devices.Service - HealthServer *health.Server + + MediaStore media.MediaStore + + ChannelManager *channels.Manager + + DeviceService *devices.Service + + HealthServer *health.Server + manualReloadChan chan struct{} - reloading atomic.Bool + + reloading atomic.Bool } type startupBlockedProvider struct { @@ -65,322 +74,549 @@ type startupBlockedProvider struct { } func (p *startupBlockedProvider) Chat( + _ context.Context, + _ []providers.Message, + _ []providers.ToolDefinition, + _ string, + _ map[string]any, + ) (*providers.LLMResponse, error) { + return nil, fmt.Errorf("%s", p.reason) + } func (p *startupBlockedProvider) GetDefaultModel() string { + return "" + } // Run starts the gateway runtime using the configuration loaded from configPath. + func Run(debug bool, configPath string, allowEmptyStartup bool) error { + if debug { + logger.SetLevel(logger.DEBUG) + fmt.Println("🔍 Debug mode enabled") + } cfg, err := config.LoadConfig(configPath) + if err != nil { + return fmt.Errorf("error loading config: %w", err) + } provider, modelID, err := createStartupProvider(cfg, allowEmptyStartup) + if err != nil { + return fmt.Errorf("error creating provider: %w", err) + } if modelID != "" { + cfg.Agents.Defaults.ModelName = modelID + } msgBus := bus.NewMessageBus() + agentLoop := agent.NewAgentLoop(cfg, msgBus, provider) fmt.Println("\n📦 Agent Status:") + startupInfo := agentLoop.GetStartupInfo() + toolsInfo := startupInfo["tools"].(map[string]any) + skillsInfo := startupInfo["skills"].(map[string]any) + fmt.Printf(" • Tools: %d loaded\n", toolsInfo["count"]) + fmt.Printf(" • Skills: %d/%d available\n", skillsInfo["available"], skillsInfo["total"]) logger.InfoCF("agent", "Agent initialized", + map[string]any{ - "tools_count": toolsInfo["count"], - "skills_total": skillsInfo["total"], + + "tools_count": toolsInfo["count"], + + "skills_total": skillsInfo["total"], + "skills_available": skillsInfo["available"], }) runningServices, err := setupAndStartServices(cfg, agentLoop, msgBus) + if err != nil { + return err + } // Setup manual reload channel for /reload endpoint + manualReloadChan := make(chan struct{}, 1) + runningServices.manualReloadChan = manualReloadChan + reloadTrigger := func() error { + if !runningServices.reloading.CompareAndSwap(false, true) { + return fmt.Errorf("reload already in progress") + } + select { + case manualReloadChan <- struct{}{}: + return nil + default: + // Should not happen, but reset flag if channel is full + runningServices.reloading.Store(false) + return fmt.Errorf("reload already queued") + } + } + runningServices.HealthServer.SetReloadFunc(reloadTrigger) + agentLoop.SetReloadFunc(reloadTrigger) fmt.Printf("✓ Gateway started on %s:%d\n", cfg.Gateway.Host, cfg.Gateway.Port) + fmt.Println("Press Ctrl+C to stop") ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go agentLoop.Run(ctx) var configReloadChan <-chan *config.Config + stopWatch := func() {} + if cfg.Gateway.HotReload { + configReloadChan, stopWatch = setupConfigWatcherPolling(configPath, debug) + logger.Info("Config hot reload enabled") + } + defer stopWatch() sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) for { + select { + case <-sigChan: + logger.Info("Shutting down...") + shutdownGateway(runningServices, agentLoop, provider, true) + return nil + case newCfg := <-configReloadChan: + if !runningServices.reloading.CompareAndSwap(false, true) { + logger.Warn("Config reload skipped: another reload is in progress") + continue + } + err := executeReload(ctx, agentLoop, newCfg, &provider, runningServices, msgBus, allowEmptyStartup) + if err != nil { + logger.Errorf("Config reload failed: %v", err) + } + case <-manualReloadChan: + logger.Info("Manual reload triggered via /reload endpoint") + newCfg, err := config.LoadConfig(configPath) + if err != nil { + logger.Errorf("Error loading config for manual reload: %v", err) + runningServices.reloading.Store(false) + continue + } + if err = newCfg.ValidateModelList(); err != nil { + logger.Errorf("Config validation failed: %v", err) + runningServices.reloading.Store(false) + continue + } + err = executeReload(ctx, agentLoop, newCfg, &provider, runningServices, msgBus, allowEmptyStartup) + if err != nil { + logger.Errorf("Manual reload failed: %v", err) + } else { + logger.Info("Manual reload completed successfully") + } + } + } + } func executeReload( + ctx context.Context, + agentLoop *agent.AgentLoop, + newCfg *config.Config, + provider *providers.LLMProvider, + runningServices *services, + msgBus *bus.MessageBus, + allowEmptyStartup bool, + ) error { + defer runningServices.reloading.Store(false) + return handleConfigReload(ctx, agentLoop, newCfg, provider, runningServices, msgBus, allowEmptyStartup) + } func createStartupProvider( + cfg *config.Config, + allowEmptyStartup bool, + ) (providers.LLMProvider, string, error) { + modelName := cfg.Agents.Defaults.GetModelName() + if modelName == "" && allowEmptyStartup { + reason := "no default model configured; gateway started in limited mode" + fmt.Printf("⚠ Warning: %s\n", reason) + logger.WarnCF("gateway", "Gateway started without default model", map[string]any{ + "limited_mode": true, }) + return &startupBlockedProvider{reason: reason}, "", nil + } return providers.CreateProvider(cfg) + } func setupAndStartServices( + cfg *config.Config, + agentLoop *agent.AgentLoop, + msgBus *bus.MessageBus, + ) (*services, error) { + runningServices := &services{} execTimeout := time.Duration(cfg.Tools.Cron.ExecTimeoutMinutes) * time.Minute + var err error + runningServices.CronService, err = setupCronTool( + agentLoop, + msgBus, + cfg.WorkspacePath(), + cfg.Agents.Defaults.RestrictToWorkspace, + execTimeout, + cfg, ) + if err != nil { + return nil, fmt.Errorf("error setting up cron service: %w", err) + } + if err = runningServices.CronService.Start(); err != nil { + return nil, fmt.Errorf("error starting cron service: %w", err) + } + fmt.Println("✓ Cron service started") runningServices.HeartbeatService = heartbeat.NewHeartbeatService( + cfg.WorkspacePath(), + cfg.Heartbeat.Interval, + cfg.Heartbeat.Enabled, ) + runningServices.HeartbeatService.SetBus(msgBus) + runningServices.HeartbeatService.SetHandler(createHeartbeatHandler(agentLoop)) + if err = runningServices.HeartbeatService.Start(); err != nil { + return nil, fmt.Errorf("error starting heartbeat service: %w", err) + } + fmt.Println("✓ Heartbeat service started") runningServices.MediaStore = media.NewFileMediaStoreWithCleanup(media.MediaCleanerConfig{ - Enabled: cfg.Tools.MediaCleanup.Enabled, - MaxAge: time.Duration(cfg.Tools.MediaCleanup.MaxAge) * time.Minute, + + Enabled: cfg.Tools.MediaCleanup.Enabled, + + MaxAge: time.Duration(cfg.Tools.MediaCleanup.MaxAge) * time.Minute, + Interval: time.Duration(cfg.Tools.MediaCleanup.Interval) * time.Minute, }) + if fms, ok := runningServices.MediaStore.(*media.FileMediaStore); ok { + fms.Start() + } runningServices.ChannelManager, err = channels.NewManager(cfg, msgBus, runningServices.MediaStore) + if err != nil { + if fms, ok := runningServices.MediaStore.(*media.FileMediaStore); ok { + fms.Stop() + } + return nil, fmt.Errorf("error creating channel manager: %w", err) + } agentLoop.SetChannelManager(runningServices.ChannelManager) + agentLoop.SetMediaStore(runningServices.MediaStore) if transcriber := voice.DetectTranscriber(cfg); transcriber != nil { + agentLoop.SetTranscriber(transcriber) + logger.InfoCF("voice", "Transcription enabled (agent-level)", map[string]any{"provider": transcriber.Name()}) + } enabledChannels := runningServices.ChannelManager.GetEnabledChannels() + if len(enabledChannels) > 0 { + fmt.Printf("✓ Channels enabled: %s\n", enabledChannels) + } else { + fmt.Println("⚠ Warning: No channels enabled") + } addr := fmt.Sprintf("%s:%d", cfg.Gateway.Host, cfg.Gateway.Port) + runningServices.HealthServer = health.NewServer(cfg.Gateway.Host, cfg.Gateway.Port) + runningServices.ChannelManager.SetupHTTPServer(addr, runningServices.HealthServer) if err = runningServices.ChannelManager.StartAll(context.Background()); err != nil { + return nil, fmt.Errorf("error starting channels: %w", err) + } fmt.Printf( + "✓ Health endpoints available at http://%s:%d/health, /ready and /reload (POST)\n", + cfg.Gateway.Host, + cfg.Gateway.Port, ) stateManager := state.NewManager(cfg.WorkspacePath()) + runningServices.DeviceService = devices.NewService(devices.Config{ - Enabled: cfg.Devices.Enabled, + + Enabled: cfg.Devices.Enabled, + MonitorUSB: cfg.Devices.MonitorUSB, }, stateManager) + runningServices.DeviceService.SetBus(msgBus) + if err = runningServices.DeviceService.Start(context.Background()); err != nil { + logger.ErrorCF("device", "Error starting device service", map[string]any{"error": err.Error()}) + } else if cfg.Devices.Enabled { + fmt.Println("✓ Device event service started") + } return runningServices, nil + } func stopAndCleanupServices(runningServices *services, shutdownTimeout time.Duration, isReload bool) { shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout) + defer shutdownCancel() // reload should not stop channel manager if !isReload && runningServices.ChannelManager != nil { runningServices.ChannelManager.StopAll(shutdownCtx) + } + if runningServices.DeviceService != nil { + runningServices.DeviceService.Stop() + } + if runningServices.HeartbeatService != nil { + runningServices.HeartbeatService.Stop() + } + if runningServices.CronService != nil { + runningServices.CronService.Stop() + } + if runningServices.MediaStore != nil { + if fms, ok := runningServices.MediaStore.(*media.FileMediaStore); ok { + fms.Stop() + } + } + } func shutdownGateway( + runningServices *services, + agentLoop *agent.AgentLoop, + provider providers.LLMProvider, + fullShutdown bool, + ) { + if cp, ok := provider.(providers.StatefulProvider); ok && fullShutdown { + cp.Close() + } stopAndCleanupServices(runningServices, gracefulShutdownTimeout, false) agentLoop.Stop() + agentLoop.Close() logger.Info("✓ Gateway stopped") + } func handleConfigReload( + ctx context.Context, + al *agent.AgentLoop, + newCfg *config.Config, + providerRef *providers.LLMProvider, + runningServices *services, + msgBus *bus.MessageBus, + allowEmptyStartup bool, + ) error { + logger.Info("🔄 Config file changed, reloading...") newModel := newCfg.Agents.Defaults.ModelName + if newModel == "" { + newModel = newCfg.Agents.Defaults.Model + } logger.Infof(" New model is '%s', recreating provider...", newModel) @@ -389,269 +625,447 @@ func handleConfigReload( stopAndCleanupServices(runningServices, serviceShutdownTimeout, true) newProvider, newModelID, err := createStartupProvider(newCfg, allowEmptyStartup) + if err != nil { + logger.Errorf(" ⚠ Error creating new provider: %v", err) + logger.Warn(" Attempting to restart services with old provider and config...") + if restartErr := restartServices(al, runningServices, msgBus); restartErr != nil { + logger.Errorf(" ⚠ Failed to restart services: %v", restartErr) + } + return fmt.Errorf("error creating new provider: %w", err) + } if newModelID != "" { + newCfg.Agents.Defaults.ModelName = newModelID + } reloadCtx, reloadCancel := context.WithTimeout(context.Background(), providerReloadTimeout) + defer reloadCancel() if err := al.ReloadProviderAndConfig(reloadCtx, newProvider, newCfg); err != nil { + logger.Errorf(" ⚠ Error reloading agent loop: %v", err) + if cp, ok := newProvider.(providers.StatefulProvider); ok { + cp.Close() + } + logger.Warn(" Attempting to restart services with old provider and config...") + if restartErr := restartServices(al, runningServices, msgBus); restartErr != nil { + logger.Errorf(" ⚠ Failed to restart services: %v", restartErr) + } + return fmt.Errorf("error reloading agent loop: %w", err) + } *providerRef = newProvider logger.Info(" Restarting all services with new configuration...") + if err := restartServices(al, runningServices, msgBus); err != nil { + logger.Errorf(" ⚠ Error restarting services: %v", err) + return fmt.Errorf("error restarting services: %w", err) + } logger.Info(" ✓ Provider, configuration, and services reloaded successfully (thread-safe)") + return nil + } func restartServices( + al *agent.AgentLoop, + runningServices *services, + msgBus *bus.MessageBus, + ) error { + cfg := al.GetConfig() execTimeout := time.Duration(cfg.Tools.Cron.ExecTimeoutMinutes) * time.Minute + var err error + runningServices.CronService, err = setupCronTool( + al, + msgBus, + cfg.WorkspacePath(), + cfg.Agents.Defaults.RestrictToWorkspace, + execTimeout, + cfg, ) + if err != nil { + return fmt.Errorf("error restarting cron service: %w", err) + } + if err = runningServices.CronService.Start(); err != nil { + return fmt.Errorf("error restarting cron service: %w", err) + } + fmt.Println(" ✓ Cron service restarted") runningServices.HeartbeatService = heartbeat.NewHeartbeatService( + cfg.WorkspacePath(), + cfg.Heartbeat.Interval, + cfg.Heartbeat.Enabled, ) + runningServices.HeartbeatService.SetBus(msgBus) + runningServices.HeartbeatService.SetHandler(createHeartbeatHandler(al)) + if err = runningServices.HeartbeatService.Start(); err != nil { + return fmt.Errorf("error restarting heartbeat service: %w", err) + } + fmt.Println(" ✓ Heartbeat service restarted") runningServices.MediaStore = media.NewFileMediaStoreWithCleanup(media.MediaCleanerConfig{ - Enabled: cfg.Tools.MediaCleanup.Enabled, - MaxAge: time.Duration(cfg.Tools.MediaCleanup.MaxAge) * time.Minute, + + Enabled: cfg.Tools.MediaCleanup.Enabled, + + MaxAge: time.Duration(cfg.Tools.MediaCleanup.MaxAge) * time.Minute, + Interval: time.Duration(cfg.Tools.MediaCleanup.Interval) * time.Minute, }) + if fms, ok := runningServices.MediaStore.(*media.FileMediaStore); ok { + fms.Start() + } + al.SetMediaStore(runningServices.MediaStore) runningServices.ChannelManager, err = channels.NewManager(cfg, msgBus, runningServices.MediaStore) + if err != nil { + return fmt.Errorf("error recreating channel manager: %w", err) + } + al.SetChannelManager(runningServices.ChannelManager) enabledChannels := runningServices.ChannelManager.GetEnabledChannels() + if len(enabledChannels) > 0 { + fmt.Printf(" ✓ Channels enabled: %s\n", enabledChannels) + } else { + fmt.Println(" ⚠ Warning: No channels enabled") + } addr := fmt.Sprintf("%s:%d", cfg.Gateway.Host, cfg.Gateway.Port) + // Reuse existing HealthServer to preserve reloadFunc + if runningServices.HealthServer == nil { + runningServices.HealthServer = health.NewServer(cfg.Gateway.Host, cfg.Gateway.Port) + } + runningServices.ChannelManager.SetupHTTPServer(addr, runningServices.HealthServer) if err = runningServices.ChannelManager.Reload(context.Background(), cfg); err != nil { return fmt.Errorf("error reload channels: %w", err) } + fmt.Println(" ✓ Channels restarted.") stateManager := state.NewManager(cfg.WorkspacePath()) + runningServices.DeviceService = devices.NewService(devices.Config{ - Enabled: cfg.Devices.Enabled, + + Enabled: cfg.Devices.Enabled, + MonitorUSB: cfg.Devices.MonitorUSB, }, stateManager) + runningServices.DeviceService.SetBus(msgBus) + if err := runningServices.DeviceService.Start(context.Background()); err != nil { + logger.WarnCF("device", "Failed to restart device service", map[string]any{"error": err.Error()}) + } else if cfg.Devices.Enabled { + fmt.Println(" ✓ Device event service restarted") + } transcriber := voice.DetectTranscriber(cfg) + al.SetTranscriber(transcriber) + if transcriber != nil { + logger.InfoCF("voice", "Transcription re-enabled (agent-level)", map[string]any{"provider": transcriber.Name()}) + } else { + logger.InfoCF("voice", "Transcription disabled", nil) + } return nil + } func setupConfigWatcherPolling(configPath string, debug bool) (chan *config.Config, func()) { + configChan := make(chan *config.Config, 1) + stop := make(chan struct{}) + var wg sync.WaitGroup wg.Add(1) + go func() { + defer wg.Done() lastModTime := getFileModTime(configPath) + lastSize := getFileSize(configPath) ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() for { + select { + case <-ticker.C: + currentModTime := getFileModTime(configPath) + currentSize := getFileSize(configPath) if currentModTime.After(lastModTime) || currentSize != lastSize { + if debug { + logger.Debugf("🔍 Config file change detected") + } time.Sleep(500 * time.Millisecond) lastModTime = currentModTime + lastSize = currentSize newCfg, err := config.LoadConfig(configPath) + if err != nil { + logger.Errorf("⚠ Error loading new config: %v", err) + logger.Warn(" Using previous valid config") + continue + } if err := newCfg.ValidateModelList(); err != nil { + logger.Errorf(" ⚠ New config validation failed: %v", err) + logger.Warn(" Using previous valid config") + continue + } logger.Info("✓ Config file validated and loaded") select { + case configChan <- newCfg: + default: + logger.Warn("⚠ Previous config reload still in progress, skipping") + } + } + case <-stop: + return + } + } + }() stopFunc := func() { + close(stop) + wg.Wait() + } return configChan, stopFunc + } func getFileModTime(path string) time.Time { + info, err := os.Stat(path) + if err != nil { + return time.Time{} + } + return info.ModTime() + } func getFileSize(path string) int64 { + info, err := os.Stat(path) + if err != nil { + return 0 + } + return info.Size() + } func setupCronTool( + agentLoop *agent.AgentLoop, + msgBus *bus.MessageBus, + workspace string, + restrict bool, + execTimeout time.Duration, + cfg *config.Config, + ) (*cron.CronService, error) { + cronStorePath := filepath.Join(workspace, "cron", "jobs.json") cronService := cron.NewCronService(cronStorePath, nil) var cronTool *tools.CronTool + if cfg.Tools.IsToolEnabled("cron") { + var err error + cronTool, err = tools.NewCronTool(cronService, agentLoop, msgBus, workspace, restrict, execTimeout, cfg) + if err != nil { + return nil, fmt.Errorf("critical error during CronTool initialization: %w", err) + } agentLoop.RegisterTool(cronTool) + } if cronTool != nil { + cronService.SetOnJob(func(job *cron.CronJob) (string, error) { + result := cronTool.ExecuteJob(context.Background(), job) + return result, nil + }) + } return cronService, nil + } func createHeartbeatHandler(agentLoop *agent.AgentLoop) func(prompt, channel, chatID string) *tools.ToolResult { + return func(prompt, channel, chatID string) *tools.ToolResult { + if channel == "" || chatID == "" { + channel, chatID = "cli", "direct" + } response, err := agentLoop.ProcessHeartbeat(context.Background(), prompt, channel, chatID) + if err != nil { + return tools.ErrorResult(fmt.Sprintf("Heartbeat error: %v", err)) + } + if response == "HEARTBEAT_OK" { + return tools.SilentResult("Heartbeat OK") + } + return tools.SilentResult(response) + } + } From e5f149223e1a0b43bd0f719f20a4a4f351dff1ae Mon Sep 17 00:00:00 2001 From: avaksru Date: Thu, 19 Mar 2026 11:39:27 +0300 Subject: [PATCH 09/10] fix gofmt err --- pkg/channels/mqtt/init.go | 7 ------- pkg/channels/mqtt/mqtt.go | 1 + pkg/gateway/gateway.go | 10 ---------- 3 files changed, 1 insertion(+), 17 deletions(-) diff --git a/pkg/channels/mqtt/init.go b/pkg/channels/mqtt/init.go index ef0acf181..56080f6f5 100644 --- a/pkg/channels/mqtt/init.go +++ b/pkg/channels/mqtt/init.go @@ -7,17 +7,10 @@ import ( ) func init() { - channels.RegisterFactory("mqtt", func(cfg *config.Config, b *bus.MessageBus) (channels.Channel, error) { - if !cfg.Channels.MQTT.Enabled { - return nil, nil - } - return NewMQTTChannel(cfg.Channels.MQTT, b) - }) - } diff --git a/pkg/channels/mqtt/mqtt.go b/pkg/channels/mqtt/mqtt.go index fdaaabd90..559dfcfc0 100644 --- a/pkg/channels/mqtt/mqtt.go +++ b/pkg/channels/mqtt/mqtt.go @@ -11,6 +11,7 @@ import ( "time" mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/sipeed/picoclaw/pkg/bus" "github.com/sipeed/picoclaw/pkg/channels" "github.com/sipeed/picoclaw/pkg/config" diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 075666e3e..d548276b0 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -74,27 +74,17 @@ type startupBlockedProvider struct { } func (p *startupBlockedProvider) Chat( - _ context.Context, - _ []providers.Message, - _ []providers.ToolDefinition, - _ string, - _ map[string]any, - ) (*providers.LLMResponse, error) { - return nil, fmt.Errorf("%s", p.reason) - } func (p *startupBlockedProvider) GetDefaultModel() string { - return "" - } // Run starts the gateway runtime using the configuration loaded from configPath. From 1af3a002e772591e65467761b10c7eefd9a1f557 Mon Sep 17 00:00:00 2001 From: avaksru Date: Fri, 20 Mar 2026 17:32:42 +0300 Subject: [PATCH 10/10] fix: resolve linter errors (gci, golines, gofmt, gofumpt) - Fix pkg/gateway/gateway.go: remove unnecessary leading/trailing newlines and fix gofumpt formatting - Fix pkg/config/config.go: reformat long struct tags to meet golines 120-char limit - Fix pkg/channels/mqtt/mqtt.go: fix gofmt formatting - Fix gci import ordering across pkg/config/ and pkg/channels/mqtt/ packages All linter errors resolved: 0 issues remaining --- pkg/channels/mqtt/mqtt.go | 2 +- pkg/config/config.go | 22 +- pkg/gateway/gateway.go | 409 +------------------------------------- 3 files changed, 20 insertions(+), 413 deletions(-) diff --git a/pkg/channels/mqtt/mqtt.go b/pkg/channels/mqtt/mqtt.go index 559dfcfc0..0e8a80e45 100644 --- a/pkg/channels/mqtt/mqtt.go +++ b/pkg/channels/mqtt/mqtt.go @@ -203,7 +203,7 @@ func (c *MQTTChannel) onMessage(client mqtt.Client, msg mqtt.Message) { // Check if subscribe_json_key is configured if c.config.SubscribeJSONKey != nil && *c.config.SubscribeJSONKey != "" { // Parse as JSON and extract the specified key - var jsonMsg map[string]interface{} + var jsonMsg map[string]any if err := json.Unmarshal(msg.Payload(), &jsonMsg); err == nil { // Successfully parsed as JSON if value, exists := jsonMsg[*c.config.SubscribeJSONKey]; exists { diff --git a/pkg/config/config.go b/pkg/config/config.go index babbe0ef0..36d8ad50f 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -755,9 +755,9 @@ type SearXNGConfig struct { } type GLMSearchConfig struct { - Enabled bool `json:"enabled" env:"PICOCLAW_TOOLS_WEB_GLM_ENABLED"` - APIKey string `json:"api_key" env:"PICOCLAW_TOOLS_WEB_GLM_API_KEY"` - BaseURL string `json:"base_url" env:"PICOCLAW_TOOLS_WEB_GLM_BASE_URL"` + Enabled bool `json:"enabled" env:"PICOCLAW_TOOLS_WEB_GLM_ENABLED"` + APIKey string `json:"api_key" env:"PICOCLAW_TOOLS_WEB_GLM_API_KEY"` + BaseURL string `json:"base_url" env:"PICOCLAW_TOOLS_WEB_GLM_BASE_URL"` // SearchEngine specifies the search backend: "search_std" (default), // "search_pro", "search_pro_sogou", or "search_pro_quark". SearchEngine string `json:"search_engine" env:"PICOCLAW_TOOLS_WEB_GLM_SEARCH_ENGINE"` @@ -765,7 +765,7 @@ type GLMSearchConfig struct { } type WebToolsConfig struct { - ToolConfig ` envPrefix:"PICOCLAW_TOOLS_WEB_"` + ToolConfig ` envPrefix:"PICOCLAW_TOOLS_WEB_"` Brave BraveConfig ` json:"brave"` Tavily TavilyConfig ` json:"tavily"` DuckDuckGo DuckDuckGoConfig ` json:"duckduckgo"` @@ -777,13 +777,13 @@ type WebToolsConfig struct { // the client-side web_search tool is hidden to avoid duplicate search surfaces, // and the provider's built-in search is used instead. Falls back to client-side // search when the provider does not support native search. - PreferNative bool ` json:"prefer_native" env:"PICOCLAW_TOOLS_WEB_PREFER_NATIVE"` + PreferNative bool `json:"prefer_native" env:"PICOCLAW_TOOLS_WEB_PREFER_NATIVE"` // Proxy is an optional proxy URL for web tools (http/https/socks5/socks5h). // For authenticated proxies, prefer HTTP_PROXY/HTTPS_PROXY env vars instead of embedding credentials in config. - Proxy string ` json:"proxy,omitempty" env:"PICOCLAW_TOOLS_WEB_PROXY"` - FetchLimitBytes int64 ` json:"fetch_limit_bytes,omitempty" env:"PICOCLAW_TOOLS_WEB_FETCH_LIMIT_BYTES"` - Format string ` json:"format,omitempty" env:"PICOCLAW_TOOLS_WEB_FORMAT"` - PrivateHostWhitelist FlexibleStringSlice ` json:"private_host_whitelist,omitempty" env:"PICOCLAW_TOOLS_WEB_PRIVATE_HOST_WHITELIST"` + Proxy string `json:"proxy,omitempty" env:"PICOCLAW_TOOLS_WEB_PROXY"` + FetchLimitBytes int64 `json:"fetch_limit_bytes,omitempty" env:"PICOCLAW_TOOLS_WEB_FETCH_LIMIT_BYTES"` + Format string `json:"format,omitempty" env:"PICOCLAW_TOOLS_WEB_FORMAT"` + PrivateHostWhitelist FlexibleStringSlice `json:"private_host_whitelist,omitempty" env:"PICOCLAW_TOOLS_WEB_PRIVATE_HOST_WHITELIST"` } type CronToolsConfig struct { @@ -898,10 +898,10 @@ type MCPServerConfig struct { // MCPConfig defines configuration for all MCP servers type MCPConfig struct { - ToolConfig ` envPrefix:"PICOCLAW_TOOLS_MCP_"` + ToolConfig ` envPrefix:"PICOCLAW_TOOLS_MCP_"` Discovery ToolDiscoveryConfig ` json:"discovery"` // Servers is a map of server name to server configuration - Servers map[string]MCPServerConfig ` json:"servers,omitempty"` + Servers map[string]MCPServerConfig `json:"servers,omitempty"` } func LoadConfig(path string) (*Config, error) { diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index d548276b0..1a9ddf8d5 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -88,974 +88,581 @@ func (p *startupBlockedProvider) GetDefaultModel() string { } // Run starts the gateway runtime using the configuration loaded from configPath. - func Run(debug bool, configPath string, allowEmptyStartup bool) error { - if debug { - logger.SetLevel(logger.DEBUG) - fmt.Println("🔍 Debug mode enabled") - } cfg, err := config.LoadConfig(configPath) - if err != nil { - return fmt.Errorf("error loading config: %w", err) - } provider, modelID, err := createStartupProvider(cfg, allowEmptyStartup) - if err != nil { - return fmt.Errorf("error creating provider: %w", err) - } if modelID != "" { - cfg.Agents.Defaults.ModelName = modelID - } msgBus := bus.NewMessageBus() - agentLoop := agent.NewAgentLoop(cfg, msgBus, provider) fmt.Println("\n📦 Agent Status:") - startupInfo := agentLoop.GetStartupInfo() - toolsInfo := startupInfo["tools"].(map[string]any) - skillsInfo := startupInfo["skills"].(map[string]any) fmt.Printf(" • Tools: %d loaded\n", toolsInfo["count"]) - fmt.Printf(" • Skills: %d/%d available\n", skillsInfo["available"], skillsInfo["total"]) logger.InfoCF("agent", "Agent initialized", - map[string]any{ - - "tools_count": toolsInfo["count"], - - "skills_total": skillsInfo["total"], - + "tools_count": toolsInfo["count"], + "skills_total": skillsInfo["total"], "skills_available": skillsInfo["available"], }) runningServices, err := setupAndStartServices(cfg, agentLoop, msgBus) - if err != nil { - return err - } // Setup manual reload channel for /reload endpoint - manualReloadChan := make(chan struct{}, 1) - runningServices.manualReloadChan = manualReloadChan reloadTrigger := func() error { - if !runningServices.reloading.CompareAndSwap(false, true) { - return fmt.Errorf("reload already in progress") - } - select { - case manualReloadChan <- struct{}{}: - return nil - default: - // Should not happen, but reset flag if channel is full - runningServices.reloading.Store(false) - return fmt.Errorf("reload already queued") - } - } runningServices.HealthServer.SetReloadFunc(reloadTrigger) - agentLoop.SetReloadFunc(reloadTrigger) fmt.Printf("✓ Gateway started on %s:%d\n", cfg.Gateway.Host, cfg.Gateway.Port) - fmt.Println("Press Ctrl+C to stop") ctx, cancel := context.WithCancel(context.Background()) - defer cancel() go agentLoop.Run(ctx) var configReloadChan <-chan *config.Config - stopWatch := func() {} if cfg.Gateway.HotReload { - configReloadChan, stopWatch = setupConfigWatcherPolling(configPath, debug) - logger.Info("Config hot reload enabled") - } - defer stopWatch() sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) for { - select { - case <-sigChan: - logger.Info("Shutting down...") - shutdownGateway(runningServices, agentLoop, provider, true) - return nil case newCfg := <-configReloadChan: - if !runningServices.reloading.CompareAndSwap(false, true) { - logger.Warn("Config reload skipped: another reload is in progress") - continue - } - err := executeReload(ctx, agentLoop, newCfg, &provider, runningServices, msgBus, allowEmptyStartup) - if err != nil { - logger.Errorf("Config reload failed: %v", err) - } case <-manualReloadChan: - logger.Info("Manual reload triggered via /reload endpoint") - newCfg, err := config.LoadConfig(configPath) - if err != nil { - logger.Errorf("Error loading config for manual reload: %v", err) - runningServices.reloading.Store(false) - continue - } - if err = newCfg.ValidateModelList(); err != nil { - logger.Errorf("Config validation failed: %v", err) - runningServices.reloading.Store(false) - continue - } - err = executeReload(ctx, agentLoop, newCfg, &provider, runningServices, msgBus, allowEmptyStartup) - if err != nil { - logger.Errorf("Manual reload failed: %v", err) - } else { - logger.Info("Manual reload completed successfully") - } - } - } - } func executeReload( - ctx context.Context, - agentLoop *agent.AgentLoop, - newCfg *config.Config, - provider *providers.LLMProvider, - runningServices *services, - msgBus *bus.MessageBus, - allowEmptyStartup bool, - ) error { - defer runningServices.reloading.Store(false) - return handleConfigReload(ctx, agentLoop, newCfg, provider, runningServices, msgBus, allowEmptyStartup) - } func createStartupProvider( - cfg *config.Config, - allowEmptyStartup bool, - ) (providers.LLMProvider, string, error) { - modelName := cfg.Agents.Defaults.GetModelName() - if modelName == "" && allowEmptyStartup { - reason := "no default model configured; gateway started in limited mode" - fmt.Printf("⚠ Warning: %s\n", reason) - logger.WarnCF("gateway", "Gateway started without default model", map[string]any{ - "limited_mode": true, }) - return &startupBlockedProvider{reason: reason}, "", nil - } - return providers.CreateProvider(cfg) - } func setupAndStartServices( - cfg *config.Config, - agentLoop *agent.AgentLoop, - msgBus *bus.MessageBus, - ) (*services, error) { - runningServices := &services{} - execTimeout := time.Duration(cfg.Tools.Cron.ExecTimeoutMinutes) * time.Minute var err error - runningServices.CronService, err = setupCronTool( - agentLoop, - msgBus, - cfg.WorkspacePath(), - cfg.Agents.Defaults.RestrictToWorkspace, - execTimeout, - cfg, ) - if err != nil { - return nil, fmt.Errorf("error setting up cron service: %w", err) - } - if err = runningServices.CronService.Start(); err != nil { - return nil, fmt.Errorf("error starting cron service: %w", err) - } - fmt.Println("✓ Cron service started") runningServices.HeartbeatService = heartbeat.NewHeartbeatService( - cfg.WorkspacePath(), - cfg.Heartbeat.Interval, - cfg.Heartbeat.Enabled, ) - runningServices.HeartbeatService.SetBus(msgBus) - runningServices.HeartbeatService.SetHandler(createHeartbeatHandler(agentLoop)) - if err = runningServices.HeartbeatService.Start(); err != nil { - return nil, fmt.Errorf("error starting heartbeat service: %w", err) - } - fmt.Println("✓ Heartbeat service started") runningServices.MediaStore = media.NewFileMediaStoreWithCleanup(media.MediaCleanerConfig{ - - Enabled: cfg.Tools.MediaCleanup.Enabled, - - MaxAge: time.Duration(cfg.Tools.MediaCleanup.MaxAge) * time.Minute, - + Enabled: cfg.Tools.MediaCleanup.Enabled, + MaxAge: time.Duration(cfg.Tools.MediaCleanup.MaxAge) * time.Minute, Interval: time.Duration(cfg.Tools.MediaCleanup.Interval) * time.Minute, }) - if fms, ok := runningServices.MediaStore.(*media.FileMediaStore); ok { - fms.Start() - } runningServices.ChannelManager, err = channels.NewManager(cfg, msgBus, runningServices.MediaStore) - if err != nil { - if fms, ok := runningServices.MediaStore.(*media.FileMediaStore); ok { - fms.Stop() - } - return nil, fmt.Errorf("error creating channel manager: %w", err) - } - agentLoop.SetChannelManager(runningServices.ChannelManager) - agentLoop.SetMediaStore(runningServices.MediaStore) if transcriber := voice.DetectTranscriber(cfg); transcriber != nil { - agentLoop.SetTranscriber(transcriber) - logger.InfoCF("voice", "Transcription enabled (agent-level)", map[string]any{"provider": transcriber.Name()}) - } enabledChannels := runningServices.ChannelManager.GetEnabledChannels() - if len(enabledChannels) > 0 { - fmt.Printf("✓ Channels enabled: %s\n", enabledChannels) - } else { - fmt.Println("⚠ Warning: No channels enabled") - } addr := fmt.Sprintf("%s:%d", cfg.Gateway.Host, cfg.Gateway.Port) - runningServices.HealthServer = health.NewServer(cfg.Gateway.Host, cfg.Gateway.Port) - runningServices.ChannelManager.SetupHTTPServer(addr, runningServices.HealthServer) - if err = runningServices.ChannelManager.StartAll(context.Background()); err != nil { - return nil, fmt.Errorf("error starting channels: %w", err) - } - fmt.Printf( - "✓ Health endpoints available at http://%s:%d/health, /ready and /reload (POST)\n", - cfg.Gateway.Host, - cfg.Gateway.Port, ) stateManager := state.NewManager(cfg.WorkspacePath()) - runningServices.DeviceService = devices.NewService(devices.Config{ - - Enabled: cfg.Devices.Enabled, - + Enabled: cfg.Devices.Enabled, MonitorUSB: cfg.Devices.MonitorUSB, }, stateManager) - runningServices.DeviceService.SetBus(msgBus) - if err = runningServices.DeviceService.Start(context.Background()); err != nil { - logger.ErrorCF("device", "Error starting device service", map[string]any{"error": err.Error()}) - } else if cfg.Devices.Enabled { - fmt.Println("✓ Device event service started") - } return runningServices, nil - } func stopAndCleanupServices(runningServices *services, shutdownTimeout time.Duration, isReload bool) { shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout) - defer shutdownCancel() // reload should not stop channel manager if !isReload && runningServices.ChannelManager != nil { runningServices.ChannelManager.StopAll(shutdownCtx) - } if runningServices.DeviceService != nil { - runningServices.DeviceService.Stop() - } if runningServices.HeartbeatService != nil { - runningServices.HeartbeatService.Stop() - } if runningServices.CronService != nil { - runningServices.CronService.Stop() - } if runningServices.MediaStore != nil { - if fms, ok := runningServices.MediaStore.(*media.FileMediaStore); ok { - fms.Stop() - } - } - } func shutdownGateway( - runningServices *services, - agentLoop *agent.AgentLoop, - provider providers.LLMProvider, - fullShutdown bool, - ) { - if cp, ok := provider.(providers.StatefulProvider); ok && fullShutdown { - cp.Close() - } - stopAndCleanupServices(runningServices, gracefulShutdownTimeout, false) - agentLoop.Stop() - agentLoop.Close() - logger.Info("✓ Gateway stopped") - } func handleConfigReload( - ctx context.Context, - al *agent.AgentLoop, - newCfg *config.Config, - providerRef *providers.LLMProvider, - runningServices *services, - msgBus *bus.MessageBus, - allowEmptyStartup bool, - ) error { - logger.Info("🔄 Config file changed, reloading...") newModel := newCfg.Agents.Defaults.ModelName - if newModel == "" { - newModel = newCfg.Agents.Defaults.Model - } logger.Infof(" New model is '%s', recreating provider...", newModel) - logger.Info(" Stopping all services...") stopAndCleanupServices(runningServices, serviceShutdownTimeout, true) newProvider, newModelID, err := createStartupProvider(newCfg, allowEmptyStartup) - if err != nil { - logger.Errorf(" ⚠ Error creating new provider: %v", err) - logger.Warn(" Attempting to restart services with old provider and config...") - if restartErr := restartServices(al, runningServices, msgBus); restartErr != nil { - logger.Errorf(" ⚠ Failed to restart services: %v", restartErr) - } - return fmt.Errorf("error creating new provider: %w", err) - } if newModelID != "" { - newCfg.Agents.Defaults.ModelName = newModelID - } reloadCtx, reloadCancel := context.WithTimeout(context.Background(), providerReloadTimeout) - defer reloadCancel() if err := al.ReloadProviderAndConfig(reloadCtx, newProvider, newCfg); err != nil { - logger.Errorf(" ⚠ Error reloading agent loop: %v", err) - if cp, ok := newProvider.(providers.StatefulProvider); ok { - cp.Close() - } - logger.Warn(" Attempting to restart services with old provider and config...") - if restartErr := restartServices(al, runningServices, msgBus); restartErr != nil { - logger.Errorf(" ⚠ Failed to restart services: %v", restartErr) - } - return fmt.Errorf("error reloading agent loop: %w", err) - } *providerRef = newProvider logger.Info(" Restarting all services with new configuration...") - if err := restartServices(al, runningServices, msgBus); err != nil { - logger.Errorf(" ⚠ Error restarting services: %v", err) - return fmt.Errorf("error restarting services: %w", err) - } logger.Info(" ✓ Provider, configuration, and services reloaded successfully (thread-safe)") - return nil - } func restartServices( - al *agent.AgentLoop, - runningServices *services, - msgBus *bus.MessageBus, - ) error { - cfg := al.GetConfig() - execTimeout := time.Duration(cfg.Tools.Cron.ExecTimeoutMinutes) * time.Minute var err error - runningServices.CronService, err = setupCronTool( - al, - msgBus, - cfg.WorkspacePath(), - cfg.Agents.Defaults.RestrictToWorkspace, - execTimeout, - cfg, ) - if err != nil { - return fmt.Errorf("error restarting cron service: %w", err) - } - if err = runningServices.CronService.Start(); err != nil { - return fmt.Errorf("error restarting cron service: %w", err) - } - fmt.Println(" ✓ Cron service restarted") runningServices.HeartbeatService = heartbeat.NewHeartbeatService( - cfg.WorkspacePath(), - cfg.Heartbeat.Interval, - cfg.Heartbeat.Enabled, ) - runningServices.HeartbeatService.SetBus(msgBus) - runningServices.HeartbeatService.SetHandler(createHeartbeatHandler(al)) - if err = runningServices.HeartbeatService.Start(); err != nil { - return fmt.Errorf("error restarting heartbeat service: %w", err) - } - fmt.Println(" ✓ Heartbeat service restarted") runningServices.MediaStore = media.NewFileMediaStoreWithCleanup(media.MediaCleanerConfig{ - - Enabled: cfg.Tools.MediaCleanup.Enabled, - - MaxAge: time.Duration(cfg.Tools.MediaCleanup.MaxAge) * time.Minute, - + Enabled: cfg.Tools.MediaCleanup.Enabled, + MaxAge: time.Duration(cfg.Tools.MediaCleanup.MaxAge) * time.Minute, Interval: time.Duration(cfg.Tools.MediaCleanup.Interval) * time.Minute, }) - if fms, ok := runningServices.MediaStore.(*media.FileMediaStore); ok { - fms.Start() - } - al.SetMediaStore(runningServices.MediaStore) runningServices.ChannelManager, err = channels.NewManager(cfg, msgBus, runningServices.MediaStore) - if err != nil { - return fmt.Errorf("error recreating channel manager: %w", err) - } - al.SetChannelManager(runningServices.ChannelManager) enabledChannels := runningServices.ChannelManager.GetEnabledChannels() - if len(enabledChannels) > 0 { - fmt.Printf(" ✓ Channels enabled: %s\n", enabledChannels) - } else { - fmt.Println(" ⚠ Warning: No channels enabled") - } addr := fmt.Sprintf("%s:%d", cfg.Gateway.Host, cfg.Gateway.Port) // Reuse existing HealthServer to preserve reloadFunc - if runningServices.HealthServer == nil { - runningServices.HealthServer = health.NewServer(cfg.Gateway.Host, cfg.Gateway.Port) - } - runningServices.ChannelManager.SetupHTTPServer(addr, runningServices.HealthServer) if err = runningServices.ChannelManager.Reload(context.Background(), cfg); err != nil { return fmt.Errorf("error reload channels: %w", err) } - fmt.Println(" ✓ Channels restarted.") stateManager := state.NewManager(cfg.WorkspacePath()) - runningServices.DeviceService = devices.NewService(devices.Config{ - - Enabled: cfg.Devices.Enabled, - + Enabled: cfg.Devices.Enabled, MonitorUSB: cfg.Devices.MonitorUSB, }, stateManager) - runningServices.DeviceService.SetBus(msgBus) - if err := runningServices.DeviceService.Start(context.Background()); err != nil { - logger.WarnCF("device", "Failed to restart device service", map[string]any{"error": err.Error()}) - } else if cfg.Devices.Enabled { - fmt.Println(" ✓ Device event service restarted") - } transcriber := voice.DetectTranscriber(cfg) - al.SetTranscriber(transcriber) if transcriber != nil { - logger.InfoCF("voice", "Transcription re-enabled (agent-level)", map[string]any{"provider": transcriber.Name()}) - } else { - logger.InfoCF("voice", "Transcription disabled", nil) - } return nil - } func setupConfigWatcherPolling(configPath string, debug bool) (chan *config.Config, func()) { - configChan := make(chan *config.Config, 1) - stop := make(chan struct{}) - var wg sync.WaitGroup wg.Add(1) - go func() { - defer wg.Done() lastModTime := getFileModTime(configPath) - lastSize := getFileSize(configPath) - ticker := time.NewTicker(2 * time.Second) - defer ticker.Stop() for { - select { - case <-ticker.C: - currentModTime := getFileModTime(configPath) - currentSize := getFileSize(configPath) if currentModTime.After(lastModTime) || currentSize != lastSize { - if debug { - logger.Debugf("🔍 Config file change detected") - } time.Sleep(500 * time.Millisecond) - lastModTime = currentModTime - lastSize = currentSize newCfg, err := config.LoadConfig(configPath) - if err != nil { - logger.Errorf("⚠ Error loading new config: %v", err) - logger.Warn(" Using previous valid config") - continue - } if err := newCfg.ValidateModelList(); err != nil { - logger.Errorf(" ⚠ New config validation failed: %v", err) - logger.Warn(" Using previous valid config") - continue - } logger.Info("✓ Config file validated and loaded") select { - case configChan <- newCfg: - default: - logger.Warn("⚠ Previous config reload still in progress, skipping") - } - } - case <-stop: - return - } - } - }() stopFunc := func() { - close(stop) - wg.Wait() - } return configChan, stopFunc - } func getFileModTime(path string) time.Time { - info, err := os.Stat(path) - if err != nil { - return time.Time{} - } - return info.ModTime() - } func getFileSize(path string) int64 { - info, err := os.Stat(path) - if err != nil { - return 0 - } - return info.Size() - } func setupCronTool( - agentLoop *agent.AgentLoop, - msgBus *bus.MessageBus, - workspace string, - restrict bool, - execTimeout time.Duration, - cfg *config.Config, - ) (*cron.CronService, error) { - cronStorePath := filepath.Join(workspace, "cron", "jobs.json") - cronService := cron.NewCronService(cronStorePath, nil) var cronTool *tools.CronTool - if cfg.Tools.IsToolEnabled("cron") { - var err error - cronTool, err = tools.NewCronTool(cronService, agentLoop, msgBus, workspace, restrict, execTimeout, cfg) - if err != nil { - return nil, fmt.Errorf("critical error during CronTool initialization: %w", err) - } - agentLoop.RegisterTool(cronTool) - } if cronTool != nil { - cronService.SetOnJob(func(job *cron.CronJob) (string, error) { - result := cronTool.ExecuteJob(context.Background(), job) - return result, nil - }) - } return cronService, nil - } func createHeartbeatHandler(agentLoop *agent.AgentLoop) func(prompt, channel, chatID string) *tools.ToolResult { - return func(prompt, channel, chatID string) *tools.ToolResult { - if channel == "" || chatID == "" { - channel, chatID = "cli", "direct" - } response, err := agentLoop.ProcessHeartbeat(context.Background(), prompt, channel, chatID) - if err != nil { - return tools.ErrorResult(fmt.Sprintf("Heartbeat error: %v", err)) - } if response == "HEARTBEAT_OK" { - return tools.SilentResult("Heartbeat OK") - } return tools.SilentResult(response) - } - }