From 159b3b097d79578c209bbefa2beb4d374bc5727d Mon Sep 17 00:00:00 2001 From: Geoff Whittington Date: Sun, 4 Dec 2022 14:25:38 -0500 Subject: [PATCH] Added use case config, fixed bugs --- main.py | 28 ++++-- plugins.py | 140 +++++++++++++++++------------- use-cases/mqtt_bridge/config.yaml | 25 ++++++ 3 files changed, 124 insertions(+), 69 deletions(-) create mode 100644 use-cases/mqtt_bridge/config.yaml diff --git a/main.py b/main.py index 7ca7c02..708218d 100644 --- a/main.py +++ b/main.py @@ -120,6 +120,8 @@ def onLost(interface): username = config["username"] if "username" in config else None password = config["password"] if "password" in config else None + logger.info(f"Connected to MQTT {config['name']}") + if client_id: mqttc = mqtt.Client(client_id) else: @@ -128,8 +130,6 @@ def onLost(interface): if username and password: mqttc.username_pw_set(username, password) - mqtt_servers[config["name"]] = mqttc - def on_connect(mqttc, obj, flags, rc): logger.debug(f"Connected to MQTT {config['name']}") @@ -137,16 +137,20 @@ def on_message(mqttc, obj, msg): orig_packet = msg.payload.decode() logger.debug(f"MQTT {config['name']}: on_message") + logger.debug(f"MQTT {config['name']}: {orig_packet}") if "pipelines" not in config: logger.warning(f"MQTT {config['name']}: no pipeline") return + p = plugins["packet_filter"] + pipeline_packet = p.do_action(orig_packet) + for pipeline, pipeline_plugins in config["pipelines"].items(): - packet = orig_packet + packet = pipeline_packet - logger.debug(f"MQTT {config['name']} pipeline {pipeline} started") + logger.debug(f"MQTT {config['name']} pipeline {pipeline} initiated") if not packet: continue @@ -179,18 +183,26 @@ def on_subscribe(mqttc, obj, mid, granted_qos): mqttc.on_publish = on_publish mqttc.on_subscribe = on_subscribe + mqtt_servers[config["name"]] = mqttc + import ssl if "insecure" in config and config["insecure"]: mqttc.tls_set(cert_reqs=ssl.CERT_NONE) mqttc.tls_insecure_set(True) - mqttc.connect(config["server"], config["port"], 60) + try: + logger.debug(f"Connecting to MQTT {config['server']}") + + mqttc.connect(config["server"], config["port"], 60) - if "topic" in config: - mqttc.subscribe(config["topic"], 0) + if "topic" in config: + mqttc.subscribe(config["topic"], 0) - mqttc.loop_start() + mqttc.loop_start() + except Exception as e: + logger.error(f"MQTT {config['name']} could not start: {e}") + pass while True: time.sleep(1000) diff --git a/plugins.py b/plugins.py index cf641ea..fd394e5 100644 --- a/plugins.py +++ b/plugins.py @@ -11,7 +11,10 @@ plugins = {} -class Plugin: +class Plugin(object): + def __init__(self) -> None: + self.logger.setLevel(logging.INFO) + def configure(self, devices, mqtt_servers, config): self.config = config self.devices = devices @@ -30,25 +33,42 @@ def do_action(self, packet): class PacketFilter(Plugin): logger = logging.getLogger(name="meshtastic.bridge.filter.packet") - def strip_raw(self, dict_obj): - if type(dict_obj) is not dict: - return dict_obj + def strip_raw(self, data): + if type(data) is not dict: + return data + + if "raw" in data: + del data["raw"] - if "raw" in dict_obj: - del dict_obj["raw"] + for k, v in data.items(): + data[k] = self.strip_raw(v) - for k, v in dict_obj.items(): - dict_obj[k] = self.strip_raw(v) + return data - return dict_obj + def normalize(self, dict_obj): + """ + Packets are either a dict, string dict or string + """ + if type(dict_obj) is not dict: + try: + dict_obj = json.loads(dict_obj) + except: + dict_obj = {"decoded": {"text": dict_obj}} + + return self.strip_raw(dict_obj) def do_action(self, packet): - packet = self.strip_raw(packet) + self.logger.debug(f"Before normalization: {packet}") + packet = self.normalize(packet) if "decoded" in packet and "payload" in packet["decoded"]: - packet["decoded"]["payload"] = base64.b64encode( - packet["decoded"]["payload"] - ).decode("utf-8") + if type(packet["decoded"]["payload"]) is bytes: + text = packet["decoded"]["payload"] + packet["decoded"]["payload"] = base64.b64encode( + packet["decoded"]["payload"] + ).decode("utf-8") + + self.logger.debug(f"After normalization: {packet}") return packet @@ -90,15 +110,17 @@ def do_action(self, packet): ) return None - if text and "disallow" in self.config["message"]: - matches = False - for disallow_regex in self.config["message"]["disallow"]: - if not matches and re.search(disallow_regex, text): - matches = True + if "disallow" in self.config["message"]: + matches = False + for disallow_regex in self.config["message"]["disallow"]: + if not matches and re.search(disallow_regex, text): + matches = True - if matches: - self.logger.debug(f"Dropped because it matches message disallow filter") - return None + if matches: + self.logger.debug( + f"Dropped because it matches message disallow filter" + ) + return None filters = { "app": packet["decoded"]["portnum"], @@ -116,7 +138,7 @@ def do_action(self, packet): and value not in filter_val["allow"] ): self.logger.debug( - f"Dropped because it doesn't match {filter_key} allow filter" + f"Dropped because {value} doesn't match {filter_key} allow filter" ) return None @@ -126,7 +148,7 @@ def do_action(self, packet): and value in filter_val["disallow"] ): self.logger.debug( - f"Dropped because it matches {filter_key} disallow filter" + f"Dropped because {value} matches {filter_key} disallow filter" ) return None @@ -205,13 +227,6 @@ class WebhookPlugin(Plugin): logger = logging.getLogger(name="meshtastic.bridge.plugin.webhook") def do_action(self, packet): - if type(packet) is not dict: - try: - packet = json.loads(packet) - except: - self.logger.warning("Packet is not dict") - return packet - if "active" in self.config and not self.config["active"]: return packet @@ -281,9 +296,16 @@ def do_action(self, packet): mqtt_server = self.mqtt_servers[self.config["name"]] - packet_payload = packet if type(packet) is str else json.dumps(packet) + if not mqtt_server.is_connected(): + self.logger.error("Not sent, not connected") + return + + packet_message = json.dumps(packet) - message = self.config["message"] if "message" in self.config else packet_payload + if "message" in self.config: + message = self.config["message"].replace("{MSG}", packet["decoded"]["text"]) + else: + message = packet_message info = mqtt_server.publish(self.config["topic"], message) info.wait_for_publish() @@ -361,48 +383,36 @@ class RadioMessagePlugin(Plugin): logger = logging.getLogger(name="meshtastic.bridge.plugin.send") def do_action(self, packet): - - if type(packet) is not dict: - try: - packet = json.loads(packet) - except: - self.logger.error("Packet is not a dict") - return packet - if self.config["device"] not in self.devices: self.logger.error(f"Missing interface for device {self.config['device']}") return packet - if "to" not in packet and "toId" not in packet: - self.logger.debug("Not a message") - return packet - - # Broadcast messages or specific - if ( - "node_mapping" in self.config - and packet["to"] in self.config["node_mapping"] - ): - destinationId = self.config["node_mapping"][packet["to"]] - else: - destinationId = packet["to"] if "to" in packet else packet["toId"] + destinationId = None if "to" in self.config: destinationId = self.config["to"] elif "toId" in self.config: destinationId = self.config["toId"] + elif "node_mapping" in self.config and "to" in packet: + destinationId = self.config["node_mapping"][packet["to"]] + elif "to" in packet: + destinationId = packet["to"] + elif "toId" in packet: + destinationId = packet["toId"] - device_name = self.config["device"] - - if device_name not in self.devices: - self.logger.warning(f"No such radio device: {device_name}") + if not destinationId: + self.logger.error("Missing 'to' property in config or packet") return packet + device_name = self.config["device"] + device = self.devices[device_name] - self.logger.debug(f"Sending packet to Radio {device_name}") + # Not a radio packet + if "decoded" in packet and "text" in packet["decoded"] and "from" not in packet: + self.logger.debug(f"Sending text to Radio {device_name}") + device.sendText(text=packet["decoded"]["text"], destinationId=destinationId) - if "message" in self.config and self.config["message"]: - device.sendText(text=self.config["message"], destinationId=destinationId) elif ( "lat" in self.config and self.config["lat"] > 0 @@ -413,20 +423,28 @@ def do_action(self, packet): lng = self.config["lng"] altitude = self.config["alt"] if "alt" in self.config else 0 + self.logger.debug(f"Sending position to Radio {device_name}") + device.sendPosition( latitude=lat, longitude=lng, altitude=altitude, destinationId=destinationId, ) - else: + elif ( + "decoded" in packet + and "payload" in packet["decoded"] + and "portnum" in packet["decoded"] + ): meshPacket = mesh_pb2.MeshPacket() meshPacket.channel = 0 meshPacket.decoded.payload = base64.b64decode(packet["decoded"]["payload"]) - meshPacket.decoded.portnum = int(packet["decoded"]["portnum"]) + meshPacket.decoded.portnum = packet["decoded"]["portnum"] meshPacket.decoded.want_response = False meshPacket.id = device._generatePacketId() + self.logger.debug(f"Sending packet to Radio {device_name}") + device._sendPacket(meshPacket=meshPacket, destinationId=destinationId) return packet diff --git a/use-cases/mqtt_bridge/config.yaml b/use-cases/mqtt_bridge/config.yaml new file mode 100644 index 0000000..ddd4eea --- /dev/null +++ b/use-cases/mqtt_bridge/config.yaml @@ -0,0 +1,25 @@ +devices: + - name: radio1 + tcp: 192.168.86.27 +mqtt_servers: + - name: external + server: broker.hivemq.com + port: 1883 + topic: meshtastic/radio-network1 + pipelines: + mqtt-to-radio: + - radio_message_plugin: + device: radio1 + to: "^all" +pipelines: + pipeline1: + - debugger: + log_level: debug + radio-to-mqtt: + - message_filter: + app: + allow: + - "TEXT_MESSAGE_APP" + - mqtt_plugin: + name: external + topic: meshtastic/radio-network1