-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmqtt.go
83 lines (75 loc) · 2.3 KB
/
mqtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package main
import (
"encoding/json"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/spf13/viper"
"log"
"reflect"
"strings"
"time"
)
var (
mq mqtt.Client
)
type dataField struct {
Type string `json:"type"`
Value any `json:"value"`
}
type mqttPayload []*dataField
func setupMQTTClient() {
mqttOptions := mqtt.NewClientOptions()
mqttOptions.SetClientID(viper.GetString("mqtt.client_id"))
mqttOptions.SetUsername(viper.GetString("mqtt.username"))
mqttOptions.SetPassword(viper.GetString("mqtt.password"))
mqttOptions.SetMaxReconnectInterval(time.Second * 5)
mqttOptions.SetConnectTimeout(time.Second)
mqttOptions.SetCleanSession(viper.GetBool("mqtt.clean_session"))
mqttOptions.SetAutoReconnect(true)
mqttOptions.SetOnConnectHandler(connectHandler)
mqttOptions.SetConnectionLostHandler(connectionLostHandler)
mqttOptions.SetOrderMatters(true)
mqttOptions.SetKeepAlive(viper.GetDuration("mqtt.keep_alive"))
mqttOptions.AddBroker(viper.GetString("mqtt.broker"))
mq = mqtt.NewClient(mqttOptions)
for token := mq.Connect(); token.Wait() && token.Error() != nil; token = mq.Connect() {
log.Println("MQTT: error connecting:", token.Error())
time.Sleep(time.Second * 5)
}
}
func connectionLostHandler(_ mqtt.Client, err error) {
log.Println("MQTT: Connection lost:", err)
}
func connectHandler(_ mqtt.Client) {
log.Println("MQTT: Connected!")
prefix := viper.GetString("mqtt.topic_prefix")
topic := fmt.Sprintf("%s%s", prefix, "/set/#")
mq.Subscribe(topic, 0, onMessage)
}
func onMessage(_ mqtt.Client, message mqtt.Message) {
parts := strings.Split(message.Topic(), "/")
prefixParts := strings.Split(viper.GetString("mqtt.topic_prefix"), "/")
parts = parts[len(prefixParts)+1:]
res := make(mqttPayload, 0, 2)
err := json.Unmarshal(message.Payload(), &res)
if err != nil {
log.Println("MQTT: Invalid message payload:", err)
return
}
values := make([]any, 0, len(res))
for _, p := range res {
switch p.Type {
case reflect.TypeOf(float32(0)).String():
values = append(values, float32(p.Value.(float64)))
case reflect.TypeOf("").String():
values = append(values, p.Value.(string))
}
}
address := "/" + strings.Join(parts, "/")
log.Println(address)
err = cli.EmitMessage(address, values...)
if err != nil {
log.Println("Could not send OSC message:", err)
return
}
}