-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathmain.py
215 lines (161 loc) · 6.91 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
import json
import logging
from re import I
import meshtastic
import meshtastic.serial_interface
import meshtastic.tcp_interface
from haversine import haversine
import time
from meshtastic import portnums_pb2, mesh_pb2
from meshtastic.__init__ import LOCAL_ADDR, BROADCAST_NUM, BROADCAST_ADDR
import os
from plugins import plugins
from pubsub import pub
import yaml
from yaml.loader import SafeLoader
import paho.mqtt.client as mqtt
logging.basicConfig()
logger = logging.getLogger(name="meshtastic.bridge")
logger.setLevel(logging.DEBUG)
class CustomTCPInterface(meshtastic.tcp_interface.TCPInterface):
def __init__(self, hostname, device_name):
self.device_name = device_name
self.hostname = hostname
super(CustomTCPInterface, self).__init__(hostname)
def onReceive(packet, interface): # called when a packet arrives
nodeInfo = interface.getMyNodeInfo()
if "pipelines" in bridge_config:
for pipeline, pipeline_plugins in bridge_config["pipelines"].items():
logger.debug(f"Pipeline {pipeline} initiated")
p = plugins["packet_filter"]
pipeline_packet = p.do_action(packet)
for plugin in pipeline_plugins:
if not pipeline_packet:
continue
for plugin_key, plugin_config in plugin.items():
logger.debug(f"Processing plugin: {pipeline}/{plugin_key}")
if not pipeline_packet:
logger.debug("Skipping since the packet is null")
continue
if plugin_key not in plugins:
logger.error(f"No such plugin: {plugin_key}. Skipping")
continue
p = plugins[plugin_key]
p.configure(devices, mqtt_servers, plugin_config)
pipeline_packet = p.do_action(pipeline_packet)
logger.debug(f"Pipeline {pipeline} completed")
def onConnection(
interface, topic=pub.AUTO_TOPIC
): # called when we (re)connect to the radio
nodeInfo = interface.getMyNodeInfo()
logger.info(
f"Connected to node: userId={nodeInfo['user']['id']} hwModel={nodeInfo['user']['hwModel']}"
)
def onLost(interface):
logger.debug(f"Connecting to {interface.hostname} ...")
devices[interface.device_name] = CustomTCPInterface(
hostname=interface.hostname, device_name=interface.device_name
)
logger.debug(f"Connected to {interface.hostname}")
pub.subscribe(onReceive, "meshtastic.receive")
pub.subscribe(onConnection, "meshtastic.connection.established")
pub.subscribe(onLost, "meshtastic.connection.lost")
with open("config.yaml") as f:
bridge_config = yaml.load(f, Loader=SafeLoader)
devices = {}
mqtt_servers = {}
if "devices" in bridge_config:
for device in bridge_config["devices"]:
if "active" in device and not device["active"]:
continue
if "serial" in device:
devices[device["name"]] = meshtastic.serial_interface.SerialInterface(
devPath=device["serial"]
)
elif "tcp" in device:
logger.debug(f"Connecting to {device['tcp']} ...")
devices[device["name"]] = CustomTCPInterface(
hostname=device["tcp"], device_name=device["name"]
)
logger.debug(f"Connected to {device['tcp']}")
else:
devices[device["name"]] = meshtastic.serial_interface.SerialInterface()
if "mqtt_servers" in bridge_config:
for config in bridge_config["mqtt_servers"]:
required_options = [
"name",
"server",
"port",
]
for option in required_options:
if option not in config:
logger.warning("Missing config: {option}")
client_id = config["client_id"] if "client_id" in config else None
username = config["username"] if "username" in config else None
password = config["password"] if "password" in config else None
logger.info(f"Connected to MQTT {config['name']}")
if client_id:
mqttc = mqtt.Client(client_id)
else:
mqttc = mqtt.Client()
if username and password:
mqttc.username_pw_set(username, password)
def on_connect(mqttc, obj, flags, rc):
logger.debug(f"Connected to MQTT {config['name']}")
def on_message(mqttc, obj, msg):
orig_packet = msg.payload.decode()
logger.debug(f"MQTT {config['name']}: {orig_packet}")
if "pipelines" not in config:
logger.warning(f"MQTT {config['name']}: no pipeline")
return
p = plugins["packet_filter"]
pipeline_packet = p.do_action(orig_packet)
for pipeline, pipeline_plugins in config["pipelines"].items():
packet = pipeline_packet
logger.debug(f"MQTT {config['name']} pipeline {pipeline} initiated")
if not packet:
continue
for plugin in pipeline_plugins:
if not packet:
continue
for plugin_key, plugin_config in plugin.items():
if plugin_key not in plugins:
logger.error(f"No such plugin: {plugin_key}. Skipping")
continue
p = plugins[plugin_key]
p.configure(devices, mqtt_servers, plugin_config)
try:
packet = p.do_action(packet)
except Exception as e:
logger.error(f"Hit an error: {e}", exc_info=True)
logger.debug(f"MQTT {config['name']} pipeline {pipeline} finished")
def on_publish(mqttc, obj, mid):
logger.debug(f"MQTT {config['name']}: on_publish: {mid}")
def on_subscribe(mqttc, obj, mid, granted_qos):
logger.debug(f"MQTT {config['name']}: on_subscribe: {mid}")
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_publish = on_publish
mqttc.on_subscribe = on_subscribe
mqtt_servers[config["name"]] = mqttc
import ssl
if "insecure" in config and config["insecure"]:
mqttc.tls_set(cert_reqs=ssl.CERT_NONE)
mqttc.tls_insecure_set(True)
try:
logger.debug(f"Connecting to MQTT {config['server']}")
mqttc.connect(config["server"], config["port"], 60)
if "topic" in config:
mqttc.subscribe(config["topic"], 0)
mqttc.loop_start()
except Exception as e:
logger.error(f"MQTT {config['name']} could not start: {e}")
pass
while True:
time.sleep(1000)
if devices:
for device, instance in devices.items():
instance.close()
if mqtt_servers:
for server, instance in mqtt_servers.items():
instance.disconnect()