Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions config/config.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,32 @@
"enabled": false
},
"reasoning_channel_id": ""
},
"mqtt": {
"enabled": false,
"broker": "tcp://broker.emqx.io:1883",
"client_id": "picoclaw-001",
"username": "",
"password": "",
"subscribe_topics": [
"picoclaw/input"
],
"subscribe_json_key": null,
"reply_topic": "picoclaw/output",
"reply_json_key": null,
"tls": false,
"tls_ca": "",
"tls_cert": "",
"tls_key": "",
"qos": 1,
"retain": false,
"prefix": "/mqtt",
"instruction": "Π‘ΡƒΠ΄ΡŒ ΠΏΡ€Π΅Π΄Π΅Π»ΡŒΠ½ΠΎ ΠΊΡ€Π°Ρ‚ΠΎΠΊ",
"allow_from": [],
"group_trigger": {
"mention_only": false
},
"reasoning_channel_id": ""
}
},
"providers": {
Expand Down
142 changes: 142 additions & 0 deletions docs/channels/mqtt/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# MQTT Channel Configuration Guide

## 1. Example Configuration

Add this to `config.json`:

```json
{
"channels": {
"mqtt": {
"enabled": true,
"broker": "tcp://localhost:1883",
"client_id": "picoclaw-bot",
"username": "",
"password": "",
"qos": 1,
"retain": false,
"tls": false,
"subscribe_topics": [
"picoclaw/input"
],
"subscribe_json_key": null,
"reply_topic": "picoclaw/output",
"reply_json_key": null,
"allow_from": [],
"group_trigger": {
"mention_only": true
},
"reasoning_channel_id": "",
"instruction": ""
}
}
}
```

## 2. Field Reference

| Field | Type | Required | Description |
|----------------------|----------|----------|-------------|
| enabled | bool | Yes | Enable or disable the MQTT channel |
| broker | string | Yes | MQTT broker URL (e.g., `tcp://localhost:1883`, `ssl://broker.example.com:8883`) |
| client_id | string | Yes | Unique client identifier for the MQTT connection |
| username | string | No | MQTT username for authentication |
| password | string | No | MQTT password for authentication |
| qos | int | No | Quality of Service level (0, 1, or 2). Default: 1 |
| retain | bool | No | Whether to retain messages. Default: false |
| tls | bool | No | Enable TLS/SSL connection. Default: false |
| subscribe_topics | []string | Yes | List of MQTT topics to subscribe to for incoming messages |
| subscribe_json_key | string | No | JSON key to extract from incoming messages. If null, treats message as plain text |
| reply_topic | string | No | Topic to publish replies to. Supports placeholders: `{client_id}`, `{topic}` |
| reply_json_key | string | No | JSON key to use when sending replies. If null, sends as plain text |
| allow_from | []string | No | Client ID whitelist (empty allows all) |
| group_trigger | object | No | Group trigger strategy (`mention_only` / `prefixes`) |
| reasoning_channel_id | string | No | Target channel for reasoning output |
| instruction | string | No | Optional instruction prefix added to all incoming messages |

## 3. Currently Supported

- **Message Format**: Supports both JSON and plain text messages
- JSON format: `{"status": "your message"}`
- Plain text: Direct text content
- Automatic JSON parsing with fallback to plain text for malformed JSON
- **JSON Key Extraction**: When `subscribe_json_key` is set, extracts specific field from JSON messages
- **JSON Response Formatting**: When `reply_json_key` is set, sends responses as JSON with specified key
- **Authentication**: Username/password authentication support
- **TLS/SSL**: Secure connections with TLS configuration
- **Quality of Service**: Configurable QoS levels (0, 1, 2)
- **Topic Management**: Multiple subscribe topics and configurable reply topics
- **Message Retention**: Optional message retention on broker
- **Auto-reconnection**: Automatic reconnection with exponential backoff
- **Group Triggers**: Support for mention-only and prefix-based triggers
- **Placeholder Replacement**: Dynamic topic names using `{client_id}` and `{topic}` placeholders

## 4. Features

- **Robust Message Handling**: Intelligent parsing that handles malformed JSON gracefully
- **Flexible Topic Configuration**: Support for multiple input topics and dynamic reply topics
- **JSON Message Processing**: Configurable JSON key extraction and response formatting
- **Connection Resilience**: Automatic reconnection with configurable retry intervals
- **Security**: TLS support and authentication for secure communication
- **Message Routing**: Support for reasoning channel routing and group trigger rules

## 5. Usage Notes

- The channel automatically handles malformed JSON by attempting to clean and parse it
- Reply topics can use placeholders to dynamically route responses
- Client IDs are used as sender identifiers in the messaging system
- Topics are treated as channels for message routing purposes
- The instruction field allows adding context or commands to all incoming messages

## 6. JSON Configuration Examples

### Plain Text Mode (Default)
```json
{
"subscribe_json_key": null,
"reply_json_key": null
}
```
- Incoming messages are treated as plain text
- Outgoing messages are sent as plain text

### JSON Input Mode
```json
{
"subscribe_json_key": "message",
"reply_json_key": null
}
```
- Incoming JSON: `{"message": "Hello world", "timestamp": 1234567890}`
- Extracted content: `"Hello world"`
- Outgoing messages are sent as plain text

### JSON Output Mode
```json
{
"subscribe_json_key": null,
"reply_json_key": "response"
}
```
- Incoming messages are treated as plain text
- Outgoing JSON: `{"response": "Bot reply"}`

### Full JSON Mode
```json
{
"subscribe_json_key": "input",
"reply_json_key": "output"
}
```
- Incoming JSON: `{"input": "What's the weather?", "location": "Moscow"}`
- Extracted content: `"What's the weather?"`
- Outgoing JSON: `{"output": "The weather is sunny"}`

## 7. Troubleshooting

### Common Issues

1. **JSON parsing fails**: Ensure your JSON messages are valid
2. **Key not found**: Verify the JSON key exists in your messages
3. **Connection issues**: Check broker URL, credentials, and TLS settings
4. **Permission denied**: Verify client ID is in the `allow_from` list if configured
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/anthropics/anthropic-sdk-go v1.26.0
github.com/bwmarrin/discordgo v0.29.0
github.com/caarlos0/env/v11 v11.4.0
github.com/eclipse/paho.mqtt.golang v1.5.1
github.com/ergochat/irc-go v0.6.0
github.com/ergochat/readline v0.1.3
github.com/gdamore/tcell/v2 v2.13.8
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI=
github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE=
github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU=
github.com/elliotchance/orderedmap/v3 v3.1.0 h1:j4DJ5ObEmMBt/lcwIecKcoRxIQUEnw0L804lXYDt/pg=
github.com/elliotchance/orderedmap/v3 v3.1.0/go.mod h1:G+Hc2RwaZvJMcS4JpGCOyViCnGeKf0bTYCGTO4uhjSo=
github.com/ergochat/irc-go v0.6.0 h1:Y0AGV76aeihJfCtLaQh+OyJKFiKGrYC0VTkeMZ6XW28=
Expand Down
55 changes: 45 additions & 10 deletions pkg/channels/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,13 @@ func (m *Manager) RecordReactionUndo(channel, chatID string, undo func()) {
}

// preSend handles typing stop, reaction undo, and placeholder editing before sending a message.
// Returns true if the message was already delivered (skip Send).
func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMessage, ch Channel) bool {
// Returns true if the message was edited into a placeholder (skip Send).
func (m *Manager) preSend(
ctx context.Context,
name string,
msg bus.OutboundMessage,
ch Channel,
) bool {
key := name + ":" + msg.ChatID

// 1. Stop typing
Expand Down Expand Up @@ -206,7 +211,11 @@ func (m *Manager) preSend(ctx context.Context, name string, msg bus.OutboundMess
return false
}

func NewManager(cfg *config.Config, messageBus *bus.MessageBus, store media.MediaStore) (*Manager, error) {
func NewManager(
cfg *config.Config,
messageBus *bus.MessageBus,
store media.MediaStore,
) (*Manager, error) {
m := &Manager{
channels: make(map[string]Channel),
workers: make(map[string]*channelWorker),
Expand Down Expand Up @@ -397,6 +406,12 @@ func (m *Manager) initChannels(channels *config.ChannelsConfig) error {
m.initChannel("irc", "IRC")
}

if m.config.Channels.MQTT.Enabled && m.config.Channels.MQTT.Broker != "" &&
m.config.Channels.MQTT.ClientID != "" &&
len(m.config.Channels.MQTT.SubscribeTopics) > 0 {
m.initChannel("mqtt", "MQTT")
}

logger.InfoCF("channels", "Channel initialization completed", map[string]any{
"enabled_channels": len(m.channels),
})
Expand Down Expand Up @@ -615,7 +630,12 @@ func (m *Manager) runWorker(ctx context.Context, name string, w *channelWorker)
// - ErrNotRunning / ErrSendFailed: permanent, no retry
// - ErrRateLimit: fixed delay retry
// - ErrTemporary / unknown: exponential backoff retry
func (m *Manager) sendWithRetry(ctx context.Context, name string, w *channelWorker, msg bus.OutboundMessage) {
func (m *Manager) sendWithRetry(
ctx context.Context,
name string,
w *channelWorker,
msg bus.OutboundMessage,
) {
// Rate limit: wait for token
if err := w.limiter.Wait(ctx); err != nil {
// ctx canceled, shutting down
Expand Down Expand Up @@ -655,7 +675,10 @@ func (m *Manager) sendWithRetry(ctx context.Context, name string, w *channelWork
}

// ErrTemporary or unknown error β€” exponential backoff
backoff := min(time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))), maxBackoff)
backoff := min(
time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))),
maxBackoff,
)
select {
case <-time.After(backoff):
case <-ctx.Done():
Expand Down Expand Up @@ -780,12 +803,21 @@ func (m *Manager) runMediaWorker(ctx context.Context, name string, w *channelWor

// sendMediaWithRetry sends a media message through the channel with rate limiting and
// retry logic. If the channel does not implement MediaSender, it silently skips.
func (m *Manager) sendMediaWithRetry(ctx context.Context, name string, w *channelWorker, msg bus.OutboundMediaMessage) {
func (m *Manager) sendMediaWithRetry(
ctx context.Context,
name string,
w *channelWorker,
msg bus.OutboundMediaMessage,
) {
ms, ok := w.ch.(MediaSender)
if !ok {
logger.DebugCF("channels", "Channel does not support MediaSender, skipping media", map[string]any{
"channel": name,
})
logger.DebugCF(
"channels",
"Channel does not support MediaSender, skipping media",
map[string]any{
"channel": name,
},
)
return
}

Expand Down Expand Up @@ -822,7 +854,10 @@ func (m *Manager) sendMediaWithRetry(ctx context.Context, name string, w *channe
}

// ErrTemporary or unknown error β€” exponential backoff
backoff := min(time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))), maxBackoff)
backoff := min(
time.Duration(float64(baseBackoff)*math.Pow(2, float64(attempt))),
maxBackoff,
)
select {
case <-time.After(backoff):
case <-ctx.Done():
Expand Down
16 changes: 16 additions & 0 deletions pkg/channels/mqtt/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package mqtt

import (
"github.com/sipeed/picoclaw/pkg/bus"
"github.com/sipeed/picoclaw/pkg/channels"
"github.com/sipeed/picoclaw/pkg/config"
)

func init() {
channels.RegisterFactory("mqtt", func(cfg *config.Config, b *bus.MessageBus) (channels.Channel, error) {
if !cfg.Channels.MQTT.Enabled {
return nil, nil
}
return NewMQTTChannel(cfg.Channels.MQTT, b)
})
}
Loading
Loading