From bed9b5ffc0f6b91ffb2a38d3333d6f24b7c766e7 Mon Sep 17 00:00:00 2001 From: Jeremiah K <17190268+jeremiah-k@users.noreply.github.com> Date: Fri, 13 Dec 2024 09:26:30 -0600 Subject: [PATCH 1/2] Adding message map management tools --- db_utils.py | 39 +++++++++++++++++++++ main.py | 18 +++++++++- matrix_utils.py | 84 ++++++++++++++++++++++++++++++--------------- meshtastic_utils.py | 16 ++++----- sample_config.yaml | 5 +++ 5 files changed, 126 insertions(+), 36 deletions(-) diff --git a/db_utils.py b/db_utils.py index c68cc3f..b5eaa48 100644 --- a/db_utils.py +++ b/db_utils.py @@ -187,3 +187,42 @@ def get_message_map_by_matrix_event_id(matrix_event_id): # result = (meshtastic_id, matrix_room_id, meshtastic_text, meshtastic_meshnet) return result[0], result[1], result[2], result[3] return None + +def wipe_message_map(): + """ + Wipes all entries from the message_map table. + Useful when db.msg_map.wipe_on_restart is True, ensuring no stale data remains. + """ + with sqlite3.connect("meshtastic.sqlite") as conn: + cursor = conn.cursor() + cursor.execute("DELETE FROM message_map") + conn.commit() + logger.info("message_map table wiped successfully.") + +def prune_message_map(msgs_to_keep): + """ + Prune the message_map table to keep only the most recent msgs_to_keep entries + in order to prevent database bloat. + We use the matrix_event_id's insertion order as a heuristic. + Note: matrix_event_id is a string, so we rely on the rowid or similar approach. + + Approach: + - Count total rows. + - If total > msgs_to_keep, delete oldest entries based on rowid. + """ + with sqlite3.connect("meshtastic.sqlite") as conn: + cursor = conn.cursor() + # Count total entries + cursor.execute("SELECT COUNT(*) FROM message_map") + total = cursor.fetchone()[0] + + if total > msgs_to_keep: + # Delete oldest entries by rowid since matrix_event_id is primary key but not necessarily numeric. + # rowid is auto-incremented and reflects insertion order. + to_delete = total - msgs_to_keep + cursor.execute( + "DELETE FROM message_map WHERE rowid IN (SELECT rowid FROM message_map ORDER BY rowid ASC LIMIT ?)", + (to_delete,) + ) + conn.commit() + logger.info(f"Pruned {to_delete} old message_map entries, keeping last {msgs_to_keep}.") diff --git a/main.py b/main.py index dcf1076..778efc6 100644 --- a/main.py +++ b/main.py @@ -14,7 +14,7 @@ # Import meshtastic_utils as a module to set event_loop import meshtastic_utils from config import relay_config -from db_utils import initialize_database, update_longnames, update_shortnames +from db_utils import initialize_database, update_longnames, update_shortnames, wipe_message_map from log_utils import get_logger from matrix_utils import connect_matrix, join_matrix_room from matrix_utils import logger as matrix_logger @@ -36,6 +36,8 @@ async def main(): """ Main asynchronous function to set up and run the relay. + Includes logic for wiping the message_map if configured in db.msg_map.wipe_on_restart. + Also updates longnames and shortnames periodically as before. """ # Set the event loop in meshtastic_utils meshtastic_utils.event_loop = asyncio.get_event_loop() @@ -43,6 +45,15 @@ async def main(): # Initialize the SQLite database initialize_database() + # Check db config for wipe_on_restart + db_config = relay_config.get("db", {}) + msg_map_config = db_config.get("msg_map", {}) + wipe_on_restart = msg_map_config.get("wipe_on_restart", False) + + if wipe_on_restart: + logger.debug("wipe_on_restart enabled. Wiping message_map now (startup).") + wipe_message_map() + # Load plugins early load_plugins() @@ -131,6 +142,11 @@ async def shutdown(): except Exception as e: meshtastic_logger.warning(f"Error closing Meshtastic client: {e}") + # Attempt to wipe message_map on shutdown if enabled + if wipe_on_restart: + logger.debug("wipe_on_restart enabled. Wiping message_map now (shutdown).") + wipe_message_map() + # Cancel the reconnect task if it exists if meshtastic_utils.reconnect_task: meshtastic_utils.reconnect_task.cancel() diff --git a/matrix_utils.py b/matrix_utils.py index cf060ab..462fe53 100644 --- a/matrix_utils.py +++ b/matrix_utils.py @@ -26,7 +26,7 @@ # Do not import plugin_loader here to avoid circular imports from meshtastic_utils import connect_meshtastic -from db_utils import get_message_map_by_matrix_event_id +from db_utils import get_message_map_by_matrix_event_id, prune_message_map, store_message_map # Extract Matrix configuration matrix_homeserver = relay_config["matrix"]["homeserver"] @@ -132,19 +132,23 @@ async def matrix_relay(room_id, message, longname, shortname, meshnet_name, port """ Relay a message from Meshtastic to Matrix, optionally storing message maps. - :param room_id: The Matrix room ID to send to. - :param message: The text message to send. - :param longname: The sender's longname on the meshnet. - :param shortname: The sender's shortname on the meshnet. - :param meshnet_name: The meshnet name passed in, but we will always override with our own relay's meshnet_name. - :param portnum: The portnum or message type source from Meshtastic. - :param meshtastic_id: The Meshtastic ID of the message, if any. - :param meshtastic_replyId: The Meshtastic replyId if this is a reaction. - :param meshtastic_text: The original Meshtastic text message if available. - :param emote: If True, send as an emote instead of a normal message. - :param emoji: If True, indicates this was originally a reaction. + IMPORTANT CHANGE: Now, we only store message maps if `relay_reactions` is True. + If `relay_reactions` is False, we skip storing to the message map entirely. + This helps maintain privacy and prevents message_map usage unless needed. + + Additionally, if `msgs_to_keep` > 0, we prune the oldest messages after storing + to prevent database bloat and maintain privacy. """ matrix_client = await connect_matrix() + + # Retrieve relay_reactions configuration; default to False now if not specified. + relay_reactions = relay_config["meshtastic"].get("relay_reactions", False) + + # Retrieve db config for message_map pruning + db_config = relay_config.get("db", {}) + msg_map_config = db_config.get("msg_map", {}) + msgs_to_keep = msg_map_config.get("msgs_to_keep", 500) # Default is 500 if not specified + try: # Always use our own local meshnet_name for outgoing events local_meshnet_name = relay_config["meshtastic"]["meshnet_name"] @@ -175,11 +179,19 @@ async def matrix_relay(room_id, message, longname, shortname, meshnet_name, port ) logger.info(f"Sent inbound radio message to matrix room: {room_id}") - # For inbound meshtastic->matrix messages, store mapping here if meshtastic_id is present and not a reaction - if meshtastic_id is not None and not emote: - from db_utils import store_message_map - # Always store our own local meshnet_name to identify origin - store_message_map(meshtastic_id, response.event_id, room_id, meshtastic_text if meshtastic_text else message, meshtastic_meshnet=local_meshnet_name) + # Only store message map if relay_reactions is True and meshtastic_id is present and not an emote. + # If relay_reactions is False, we skip storing entirely. + if relay_reactions and meshtastic_id is not None and not emote: + # Store the message map + store_message_map( + meshtastic_id, response.event_id, room_id, + meshtastic_text if meshtastic_text else message, + meshtastic_meshnet=local_meshnet_name + ) + + # If msgs_to_keep > 0, prune old messages after inserting a new one + if msgs_to_keep > 0: + prune_message_map(msgs_to_keep) except asyncio.TimeoutError: logger.error("Timed out while waiting for Matrix response") @@ -211,8 +223,13 @@ async def on_room_message( especially remote reactions that got relayed into the room as m.emote events, as we do not store them in the database. If we can't find the original message in the DB, it likely means it's a reaction to a reaction, and we stop there. + + Additionally, we only deal with message_map storage (and thus reaction linking) + if relay_reactions is True. If it's False, none of these mappings are stored or used. """ - from db_utils import store_message_map + # Importing here to avoid circular imports and to keep logic consistent + # Note: We do not call store_message_map directly here for inbound matrix->mesh messages. + # That logic occurs inside matrix_relay if needed. full_display_name = "Unknown user" message_timestamp = event.server_timestamp @@ -236,6 +253,9 @@ async def on_room_message( reaction_emoji = None original_matrix_event_id = None + # Retrieve relay_reactions option from config, now defaulting to False + relay_reactions = relay_config["meshtastic"].get("relay_reactions", False) + # Check if this is a Matrix ReactionEvent (usually m.reaction) if isinstance(event, ReactionEvent): # This is a reaction event @@ -265,9 +285,6 @@ async def on_room_message( meshtastic_replyId = event.source["content"].get("meshtastic_replyId") suppress = event.source["content"].get("mmrelay_suppress") - # Retrieve the relay_reactions option from config - relay_reactions = relay_config["meshtastic"].get("relay_reactions", True) - # If a message has suppress flag, do not process if suppress: return @@ -294,7 +311,7 @@ async def on_room_message( if not shortname: shortname = longname[:3] if longname else "???" - # Use meshtastic_text from content if available, this is the original message text from the remote mesh + # Use meshtastic_text from content if available meshtastic_text_db = event.source["content"].get("meshtastic_text", "") meshtastic_text_db = meshtastic_text_db.replace('\n', ' ').replace('\r', ' ') abbreviated_text = meshtastic_text_db[:40] + "..." if len(meshtastic_text_db) > 40 else meshtastic_text_db @@ -321,8 +338,7 @@ async def on_room_message( if original_matrix_event_id: orig = get_message_map_by_matrix_event_id(original_matrix_event_id) if not orig: - # If we don't find the original message in the DB, we suspect it's a reaction to a reaction or - # something we never recorded. In either case, we do not forward. + # If we don't find the original message in the DB, we suspect it's a reaction-to-reaction scenario logger.debug("Original message for reaction not found in DB. Possibly a reaction-to-reaction scenario. Not forwarding.") return @@ -418,6 +434,8 @@ async def on_room_message( meshtastic_channel = room_config["meshtastic_channel"] # If message is from Matrix and broadcast_enabled is True, relay to Meshtastic + # Note: If relay_reactions is False, we won't store message_map, but we can still relay. + # The lack of message_map storage just means no reaction bridging will occur. if not found_matching_plugin and event.sender != bot_user_id: if relay_config["meshtastic"]["broadcast_enabled"]: portnum = event.source["content"].get("meshtastic_portnum") @@ -431,9 +449,15 @@ async def on_room_message( channelIndex=meshtastic_channel, portNum=meshtastic.protobuf.portnums_pb2.PortNum.DETECTION_SENSOR_APP, ) - # If we got a packet with an id and it's not a reaction, store mapping - if sent_packet and hasattr(sent_packet, 'id'): + # If relay_reactions is True, we store the message map for these messages as well. + # If False, skip storing. + if relay_reactions and sent_packet and hasattr(sent_packet, 'id'): store_message_map(sent_packet.id, event.event_id, room.room_id, text, meshtastic_meshnet=local_meshnet_name) + db_config = relay_config.get("db", {}) + msg_map_config = db_config.get("msg_map", {}) + msgs_to_keep = msg_map_config.get("msgs_to_keep", 500) + if msgs_to_keep > 0: + prune_message_map(msgs_to_keep) else: meshtastic_logger.debug( f"Detection sensor packet received from {full_display_name}, but detection sensor processing is disabled." @@ -445,8 +469,14 @@ async def on_room_message( sent_packet = meshtastic_interface.sendText( text=full_message, channelIndex=meshtastic_channel ) - if sent_packet and hasattr(sent_packet, 'id'): + # Store message_map only if relay_reactions is True + if relay_reactions and sent_packet and hasattr(sent_packet, 'id'): store_message_map(sent_packet.id, event.event_id, room.room_id, text, meshtastic_meshnet=local_meshnet_name) + db_config = relay_config.get("db", {}) + msg_map_config = db_config.get("msg_map", {}) + msgs_to_keep = msg_map_config.get("msgs_to_keep", 500) + if msgs_to_keep > 0: + prune_message_map(msgs_to_keep) else: logger.debug( f"Broadcast not supported: Message from {full_display_name} dropped." diff --git a/meshtastic_utils.py b/meshtastic_utils.py index ed8520b..ff2285f 100644 --- a/meshtastic_utils.py +++ b/meshtastic_utils.py @@ -221,14 +221,14 @@ async def reconnect(): def on_meshtastic_message(packet, interface): """ - Handle incoming Meshtastic messages. For reaction messages, we now ensure that when relaying to Matrix, - we use the original message's meshtastic_meshnet from the DB so that remote-originated messages can properly - have their reactions relayed across multiple meshnets. + Handle incoming Meshtastic messages. For reaction messages, if relay_reactions is False, + we do not store message maps and thus won't be able to relay reactions back to Matrix. + If relay_reactions is True, message maps are stored inside matrix_relay(). """ # Apply reaction filtering based on config - relay_reactions = relay_config["meshtastic"].get("relay_reactions", True) + relay_reactions = relay_config["meshtastic"].get("relay_reactions", False) - # If relay_reactions is False, filter out reaction/tapback packets + # If relay_reactions is False, filter out reaction/tapback packets to avoid complexity if packet.get('decoded', {}).get('portnum') == 'TEXT_MESSAGE_APP': decoded = packet.get('decoded', {}) if not relay_reactions and ('emoji' in decoded or 'replyId' in decoded): @@ -282,7 +282,7 @@ def on_meshtastic_message(packet, interface): matrix_event_id, matrix_room_id, meshtastic_text, meshtastic_meshnet = orig abbreviated_text = meshtastic_text[:40] + "..." if len(meshtastic_text) > 40 else meshtastic_text - # Ensure that meshnet_name is always included, using the our own meshnet for accuracy. + # Ensure that meshnet_name is always included, using our own meshnet for accuracy. full_display_name = f"{longname}/{meshnet_name}" reaction_symbol = text.strip() if (text and text.strip()) else '⚠️' @@ -408,8 +408,8 @@ def on_meshtastic_message(packet, interface): logger.info(f"Relaying Meshtastic message from {longname} to Matrix") for room in matrix_rooms: if room["meshtastic_channel"] == channel: - # For inbound meshtastic->matrix messages, store meshnet_name in message_map - # so that future reactions can properly identify origin. + # Storing the message_map (if enabled) occurs inside matrix_relay() now, + # controlled by relay_reactions. asyncio.run_coroutine_threadsafe( matrix_relay( room["id"], diff --git a/sample_config.yaml b/sample_config.yaml index d715350..e5e42ff 100644 --- a/sample_config.yaml +++ b/sample_config.yaml @@ -27,6 +27,11 @@ logging: #max_log_size: 10485760 # 10 MB default if omitted #backup_count: 1 # Keeps 1 backup as the default if omitted +#db: +# msg_map: # The message map is necessary for the relay_reactions functionality. If `relay_reactions` is set to false, nothing will be saved to the message map. +# msgs_to_keep: 500 # If set to 0, it will not delete any messages; Defaults to 500 +# wipe_on_restart: true # Clears out the message map when the relay is restarted; Defaults to False + # These are core Plugins - Note: Some plugins are experimental and some need maintenance. plugins: ping: From f362fae3661d7527fda0c6722fa5b78a71d7fa08 Mon Sep 17 00:00:00 2001 From: Jeremiah K <17190268+jeremiah-k@users.noreply.github.com> Date: Fri, 13 Dec 2024 10:07:45 -0600 Subject: [PATCH 2/2] Trunk formatting auto fixes --- .trunk/trunk.yaml | 12 ++-- db_utils.py | 38 +++++++++++-- log_utils.py | 2 +- main.py | 9 ++- matrix_utils.py | 134 +++++++++++++++++++++++++++++++++----------- meshtastic_utils.py | 56 +++++++++++++----- 6 files changed, 190 insertions(+), 61 deletions(-) diff --git a/.trunk/trunk.yaml b/.trunk/trunk.yaml index f0d528e..ce09662 100644 --- a/.trunk/trunk.yaml +++ b/.trunk/trunk.yaml @@ -7,12 +7,12 @@ cli: plugins: sources: - id: trunk - ref: v1.6.5 + ref: v1.6.6 uri: https://github.com/trunk-io/plugins # Many linters and tools depend on runtimes - configure them here. (https://docs.trunk.io/runtimes) runtimes: enabled: - - node@18.12.1 + - node@18.20.5 - python@3.10.8 # This is the section where you manage your linters. (https://docs.trunk.io/check/configuration) lint: @@ -21,14 +21,14 @@ lint: - actionlint@1.7.4 - bandit@1.8.0 - black@24.10.0 - - checkov@3.2.317 + - checkov@3.2.334 - git-diff-check - isort@5.13.2 - markdownlint@0.43.0 - osv-scanner@1.9.1 - - prettier@3.4.1 - - ruff@0.8.0 - - trufflehog@3.84.1 + - prettier@3.4.2 + - ruff@0.8.3 + - trufflehog@3.86.1 - yamllint@1.35.1 actions: disabled: diff --git a/db_utils.py b/db_utils.py index b5eaa48..d86e070 100644 --- a/db_utils.py +++ b/db_utils.py @@ -5,6 +5,7 @@ logger = get_logger(name="db_utils") + # Initialize SQLite database def initialize_database(): with sqlite3.connect("meshtastic.sqlite") as conn: @@ -37,6 +38,7 @@ def initialize_database(): conn.commit() + def store_plugin_data(plugin_name, meshtastic_id, data): with sqlite3.connect("meshtastic.sqlite") as conn: cursor = conn.cursor() @@ -46,6 +48,7 @@ def store_plugin_data(plugin_name, meshtastic_id, data): ) conn.commit() + def delete_plugin_data(plugin_name, meshtastic_id): with sqlite3.connect("meshtastic.sqlite") as conn: cursor = conn.cursor() @@ -55,6 +58,7 @@ def delete_plugin_data(plugin_name, meshtastic_id): ) conn.commit() + # Get the data for a given plugin and Meshtastic ID def get_plugin_data_for_node(plugin_name, meshtastic_id): with sqlite3.connect("meshtastic.sqlite") as conn: @@ -69,6 +73,7 @@ def get_plugin_data_for_node(plugin_name, meshtastic_id): result = cursor.fetchone() return json.loads(result[0] if result else "[]") + # Get the data for a given plugin def get_plugin_data(plugin_name): with sqlite3.connect("meshtastic.sqlite") as conn: @@ -79,6 +84,7 @@ def get_plugin_data(plugin_name): ) return cursor.fetchall() + # Get the longname for a given Meshtastic ID def get_longname(meshtastic_id): with sqlite3.connect("meshtastic.sqlite") as conn: @@ -89,6 +95,7 @@ def get_longname(meshtastic_id): result = cursor.fetchone() return result[0] if result else None + def save_longname(meshtastic_id, longname): with sqlite3.connect("meshtastic.sqlite") as conn: cursor = conn.cursor() @@ -98,6 +105,7 @@ def save_longname(meshtastic_id, longname): ) conn.commit() + def update_longnames(nodes): if nodes: for node in nodes.values(): @@ -107,6 +115,7 @@ def update_longnames(nodes): longname = user.get("longName", "N/A") save_longname(meshtastic_id, longname) + def get_shortname(meshtastic_id): with sqlite3.connect("meshtastic.sqlite") as conn: cursor = conn.cursor() @@ -116,6 +125,7 @@ def get_shortname(meshtastic_id): result = cursor.fetchone() return result[0] if result else None + def save_shortname(meshtastic_id, shortname): with sqlite3.connect("meshtastic.sqlite") as conn: cursor = conn.cursor() @@ -125,6 +135,7 @@ def save_shortname(meshtastic_id, shortname): ) conn.commit() + def update_shortnames(nodes): if nodes: for node in nodes.values(): @@ -134,7 +145,14 @@ def update_shortnames(nodes): shortname = user.get("shortName", "N/A") save_shortname(meshtastic_id, shortname) -def store_message_map(meshtastic_id, matrix_event_id, matrix_room_id, meshtastic_text, meshtastic_meshnet=None): + +def store_message_map( + meshtastic_id, + matrix_event_id, + matrix_room_id, + meshtastic_text, + meshtastic_meshnet=None, +): """ Stores a message map in the database. @@ -152,10 +170,17 @@ def store_message_map(meshtastic_id, matrix_event_id, matrix_room_id, meshtastic ) cursor.execute( "INSERT OR REPLACE INTO message_map (meshtastic_id, matrix_event_id, matrix_room_id, meshtastic_text, meshtastic_meshnet) VALUES (?, ?, ?, ?, ?)", - (meshtastic_id, matrix_event_id, matrix_room_id, meshtastic_text, meshtastic_meshnet), + ( + meshtastic_id, + matrix_event_id, + matrix_room_id, + meshtastic_text, + meshtastic_meshnet, + ), ) conn.commit() + def get_message_map_by_meshtastic_id(meshtastic_id): with sqlite3.connect("meshtastic.sqlite") as conn: cursor = conn.cursor() @@ -172,6 +197,7 @@ def get_message_map_by_meshtastic_id(meshtastic_id): return result[0], result[1], result[2], result[3] return None + def get_message_map_by_matrix_event_id(matrix_event_id): with sqlite3.connect("meshtastic.sqlite") as conn: cursor = conn.cursor() @@ -188,6 +214,7 @@ def get_message_map_by_matrix_event_id(matrix_event_id): return result[0], result[1], result[2], result[3] return None + def wipe_message_map(): """ Wipes all entries from the message_map table. @@ -199,6 +226,7 @@ def wipe_message_map(): conn.commit() logger.info("message_map table wiped successfully.") + def prune_message_map(msgs_to_keep): """ Prune the message_map table to keep only the most recent msgs_to_keep entries @@ -222,7 +250,9 @@ def prune_message_map(msgs_to_keep): to_delete = total - msgs_to_keep cursor.execute( "DELETE FROM message_map WHERE rowid IN (SELECT rowid FROM message_map ORDER BY rowid ASC LIMIT ?)", - (to_delete,) + (to_delete,), ) conn.commit() - logger.info(f"Pruned {to_delete} old message_map entries, keeping last {msgs_to_keep}.") + logger.info( + f"Pruned {to_delete} old message_map entries, keeping last {msgs_to_keep}." + ) diff --git a/log_utils.py b/log_utils.py index 92e2269..7a43c30 100644 --- a/log_utils.py +++ b/log_utils.py @@ -40,7 +40,7 @@ def get_logger(name): "backup_count", 1 ) # Default to 1 backup file_handler = RotatingFileHandler( - log_file, maxBytes=max_bytes, backupCount=backup_count, encoding='utf-8' + log_file, maxBytes=max_bytes, backupCount=backup_count, encoding="utf-8" ) file_handler.setFormatter( diff --git a/main.py b/main.py index 778efc6..24f54d0 100644 --- a/main.py +++ b/main.py @@ -9,12 +9,17 @@ import sys from typing import List -from nio import RoomMessageNotice, RoomMessageText, ReactionEvent, RoomMessageEmote +from nio import ReactionEvent, RoomMessageEmote, RoomMessageNotice, RoomMessageText # Import meshtastic_utils as a module to set event_loop import meshtastic_utils from config import relay_config -from db_utils import initialize_database, update_longnames, update_shortnames, wipe_message_map +from db_utils import ( + initialize_database, + update_longnames, + update_shortnames, + wipe_message_map, +) from log_utils import get_logger from matrix_utils import connect_matrix, join_matrix_room from matrix_utils import logger as matrix_logger diff --git a/matrix_utils.py b/matrix_utils.py index 462fe53..acf910c 100644 --- a/matrix_utils.py +++ b/matrix_utils.py @@ -11,22 +11,25 @@ AsyncClient, AsyncClientConfig, MatrixRoom, + ReactionEvent, + RoomMessageEmote, RoomMessageNotice, RoomMessageText, - RoomMessageEmote, UploadResponse, WhoamiError, - ReactionEvent, ) - from PIL import Image from config import relay_config +from db_utils import ( + get_message_map_by_matrix_event_id, + prune_message_map, + store_message_map, +) from log_utils import get_logger # Do not import plugin_loader here to avoid circular imports from meshtastic_utils import connect_meshtastic -from db_utils import get_message_map_by_matrix_event_id, prune_message_map, store_message_map # Extract Matrix configuration matrix_homeserver = relay_config["matrix"]["homeserver"] @@ -43,10 +46,12 @@ matrix_client = None + def bot_command(command, payload): # Checks if the given command is directed at the bot return f"{bot_user_name}: !{command}" in payload + async def connect_matrix(): """ Establish a connection to the Matrix homeserver. @@ -94,6 +99,7 @@ async def connect_matrix(): return matrix_client + async def join_matrix_room(matrix_client, room_id_or_alias: str) -> None: """Join a Matrix room by its ID or alias.""" try: @@ -128,7 +134,20 @@ async def join_matrix_room(matrix_client, room_id_or_alias: str) -> None: except Exception as e: logger.error(f"Error joining room '{room_id_or_alias}': {e}") -async def matrix_relay(room_id, message, longname, shortname, meshnet_name, portnum, meshtastic_id=None, meshtastic_replyId=None, meshtastic_text=None, emote=False, emoji=False): + +async def matrix_relay( + room_id, + message, + longname, + shortname, + meshnet_name, + portnum, + meshtastic_id=None, + meshtastic_replyId=None, + meshtastic_text=None, + emote=False, + emoji=False, +): """ Relay a message from Meshtastic to Matrix, optionally storing message maps. @@ -147,7 +166,9 @@ async def matrix_relay(room_id, message, longname, shortname, meshnet_name, port # Retrieve db config for message_map pruning db_config = relay_config.get("db", {}) msg_map_config = db_config.get("msg_map", {}) - msgs_to_keep = msg_map_config.get("msgs_to_keep", 500) # Default is 500 if not specified + msgs_to_keep = msg_map_config.get( + "msgs_to_keep", 500 + ) # Default is 500 if not specified try: # Always use our own local meshnet_name for outgoing events @@ -184,9 +205,11 @@ async def matrix_relay(room_id, message, longname, shortname, meshnet_name, port if relay_reactions and meshtastic_id is not None and not emote: # Store the message map store_message_map( - meshtastic_id, response.event_id, room_id, + meshtastic_id, + response.event_id, + room_id, meshtastic_text if meshtastic_text else message, - meshtastic_meshnet=local_meshnet_name + meshtastic_meshnet=local_meshnet_name, ) # If msgs_to_keep > 0, prune old messages after inserting a new one @@ -198,9 +221,8 @@ async def matrix_relay(room_id, message, longname, shortname, meshnet_name, port except Exception as e: logger.error(f"Error sending radio message to matrix room {room_id}: {e}") -def truncate_message( - text, max_bytes=227 -): + +def truncate_message(text, max_bytes=227): """ Truncate the given text to fit within the specified byte size. @@ -211,9 +233,11 @@ def truncate_message( truncated_text = text.encode("utf-8")[:max_bytes].decode("utf-8", "ignore") return truncated_text + # Callback for new messages in Matrix room async def on_room_message( - room: MatrixRoom, event: Union[RoomMessageText, RoomMessageNotice, ReactionEvent, RoomMessageEmote] + room: MatrixRoom, + event: Union[RoomMessageText, RoomMessageNotice, ReactionEvent, RoomMessageEmote], ) -> None: """ Handle new messages and reactions in Matrix. For reactions, we ensure that when relaying back @@ -265,7 +289,9 @@ async def on_room_message( # Extract the reaction emoji and the original event it relates to reaction_emoji = relates_to["key"] original_matrix_event_id = relates_to["event_id"] - logger.debug(f"Original matrix event ID: {original_matrix_event_id}, Reaction emoji: {reaction_emoji}") + logger.debug( + f"Original matrix event ID: {original_matrix_event_id}, Reaction emoji: {reaction_emoji}" + ) # Check if this is a Matrix RoomMessageEmote (m.emote) if isinstance(event, RoomMessageEmote): @@ -291,7 +317,9 @@ async def on_room_message( # If this is a reaction and relay_reactions is False, do nothing if is_reaction and not relay_reactions: - logger.debug("Reaction event encountered but relay_reactions is disabled. Doing nothing.") + logger.debug( + "Reaction event encountered but relay_reactions is disabled. Doing nothing." + ) return local_meshnet_name = relay_config["meshtastic"]["meshnet_name"] @@ -301,7 +329,12 @@ async def on_room_message( # Check if we need to relay a reaction from a remote meshnet to our local meshnet. # If meshnet_name != local_meshnet_name and meshtastic_replyId is present and this is an emote, # it's a remote reaction that needs to be forwarded as a text message describing the reaction. - if meshnet_name and meshnet_name != local_meshnet_name and meshtastic_replyId and isinstance(event, RoomMessageEmote): + if ( + meshnet_name + and meshnet_name != local_meshnet_name + and meshtastic_replyId + and isinstance(event, RoomMessageEmote) + ): logger.info(f"Relaying reaction from remote meshnet: {meshnet_name}") short_meshnet_name = meshnet_name[:4] @@ -313,21 +346,30 @@ async def on_room_message( # Use meshtastic_text from content if available meshtastic_text_db = event.source["content"].get("meshtastic_text", "") - meshtastic_text_db = meshtastic_text_db.replace('\n', ' ').replace('\r', ' ') - abbreviated_text = meshtastic_text_db[:40] + "..." if len(meshtastic_text_db) > 40 else meshtastic_text_db + meshtastic_text_db = meshtastic_text_db.replace("\n", " ").replace( + "\r", " " + ) + abbreviated_text = ( + meshtastic_text_db[:40] + "..." + if len(meshtastic_text_db) > 40 + else meshtastic_text_db + ) - reaction_message = f"{shortname}/{short_meshnet_name} reacted {reaction_emoji} to \"{abbreviated_text}\"" + reaction_message = f'{shortname}/{short_meshnet_name} reacted {reaction_emoji} to "{abbreviated_text}"' # Relay the remote reaction to the local meshnet. meshtastic_interface = connect_meshtastic() from meshtastic_utils import logger as meshtastic_logger + meshtastic_channel = room_config["meshtastic_channel"] if relay_config["meshtastic"]["broadcast_enabled"]: meshtastic_logger.info( f"Relaying reaction from remote meshnet {meshnet_name} to radio broadcast" ) - logger.debug(f"Sending reaction to Meshtastic with meshnet={local_meshnet_name}: {reaction_message}") + logger.debug( + f"Sending reaction to Meshtastic with meshnet={local_meshnet_name}: {reaction_message}" + ) meshtastic_interface.sendText( text=reaction_message, channelIndex=meshtastic_channel ) @@ -339,30 +381,43 @@ async def on_room_message( orig = get_message_map_by_matrix_event_id(original_matrix_event_id) if not orig: # If we don't find the original message in the DB, we suspect it's a reaction-to-reaction scenario - logger.debug("Original message for reaction not found in DB. Possibly a reaction-to-reaction scenario. Not forwarding.") + logger.debug( + "Original message for reaction not found in DB. Possibly a reaction-to-reaction scenario. Not forwarding." + ) return # orig = (meshtastic_id, matrix_room_id, meshtastic_text, meshtastic_meshnet) - meshtastic_id, matrix_room_id, meshtastic_text_db, meshtastic_meshnet_db = orig + meshtastic_id, matrix_room_id, meshtastic_text_db, meshtastic_meshnet_db = ( + orig + ) display_name_response = await matrix_client.get_displayname(event.sender) full_display_name = display_name_response.displayname or event.sender # If not from a remote meshnet, proceed as normal to relay back to the originating meshnet short_display_name = full_display_name[:5] prefix = f"{short_display_name}[M]: " - abbreviated_text = meshtastic_text_db[:40] + "..." if len(meshtastic_text_db) > 40 else meshtastic_text_db + abbreviated_text = ( + meshtastic_text_db[:40] + "..." + if len(meshtastic_text_db) > 40 + else meshtastic_text_db + ) # Always use our local meshnet_name for outgoing events - reaction_message = f"{prefix}reacted {reaction_emoji} to \"{abbreviated_text}\"" + reaction_message = ( + f'{prefix}reacted {reaction_emoji} to "{abbreviated_text}"' + ) meshtastic_interface = connect_meshtastic() from meshtastic_utils import logger as meshtastic_logger + meshtastic_channel = room_config["meshtastic_channel"] if relay_config["meshtastic"]["broadcast_enabled"]: meshtastic_logger.info( f"Relaying reaction from {full_display_name} to radio broadcast" ) - logger.debug(f"Sending reaction to Meshtastic with meshnet={local_meshnet_name}: {reaction_message}") + logger.debug( + f"Sending reaction to Meshtastic with meshnet={local_meshnet_name}: {reaction_message}" + ) meshtastic_interface.sendText( text=reaction_message, channelIndex=meshtastic_channel ) @@ -381,9 +436,7 @@ async def on_room_message( if shortname is None: shortname = longname[:3] if longname else "???" # Remove the original prefix "[longname/meshnet]: " to avoid double-tagging - text = re.sub( - rf"^\[{full_display_name}\]: ", "", text - ) + text = re.sub(rf"^\[{full_display_name}\]: ", "", text) text = truncate_message(text) full_message = f"{shortname}/{short_meshnet_name}: {text}" else: @@ -401,6 +454,7 @@ async def on_room_message( # Plugin functionality from plugin_loader import load_plugins + plugins = load_plugins() found_matching_plugin = False @@ -439,9 +493,7 @@ async def on_room_message( if not found_matching_plugin and event.sender != bot_user_id: if relay_config["meshtastic"]["broadcast_enabled"]: portnum = event.source["content"].get("meshtastic_portnum") - if ( - portnum == "DETECTION_SENSOR_APP" - ): + if portnum == "DETECTION_SENSOR_APP": # If detection_sensor is enabled, forward this data as detection sensor data if relay_config["meshtastic"].get("detection_sensor", False): sent_packet = meshtastic_interface.sendData( @@ -451,8 +503,14 @@ async def on_room_message( ) # If relay_reactions is True, we store the message map for these messages as well. # If False, skip storing. - if relay_reactions and sent_packet and hasattr(sent_packet, 'id'): - store_message_map(sent_packet.id, event.event_id, room.room_id, text, meshtastic_meshnet=local_meshnet_name) + if relay_reactions and sent_packet and hasattr(sent_packet, "id"): + store_message_map( + sent_packet.id, + event.event_id, + room.room_id, + text, + meshtastic_meshnet=local_meshnet_name, + ) db_config = relay_config.get("db", {}) msg_map_config = db_config.get("msg_map", {}) msgs_to_keep = msg_map_config.get("msgs_to_keep", 500) @@ -470,8 +528,14 @@ async def on_room_message( text=full_message, channelIndex=meshtastic_channel ) # Store message_map only if relay_reactions is True - if relay_reactions and sent_packet and hasattr(sent_packet, 'id'): - store_message_map(sent_packet.id, event.event_id, room.room_id, text, meshtastic_meshnet=local_meshnet_name) + if relay_reactions and sent_packet and hasattr(sent_packet, "id"): + store_message_map( + sent_packet.id, + event.event_id, + room.room_id, + text, + meshtastic_meshnet=local_meshnet_name, + ) db_config = relay_config.get("db", {}) msg_map_config = db_config.get("msg_map", {}) msgs_to_keep = msg_map_config.get("msgs_to_keep", 500) @@ -482,6 +546,7 @@ async def on_room_message( f"Broadcast not supported: Message from {full_display_name} dropped." ) + async def upload_image( client: AsyncClient, image: Image.Image, filename: str ) -> UploadResponse: @@ -501,6 +566,7 @@ async def upload_image( return response + async def send_room_image( client: AsyncClient, room_id: str, upload_response: UploadResponse ): diff --git a/meshtastic_utils.py b/meshtastic_utils.py index ff2285f..4da6975 100644 --- a/meshtastic_utils.py +++ b/meshtastic_utils.py @@ -12,7 +12,13 @@ from pubsub import pub from config import relay_config -from db_utils import get_longname, get_shortname, save_longname, save_shortname, get_message_map_by_meshtastic_id +from db_utils import ( + get_longname, + get_message_map_by_meshtastic_id, + get_shortname, + save_longname, + save_shortname, +) from log_utils import get_logger # Do not import plugin_loader here to avoid circular imports @@ -27,12 +33,15 @@ meshtastic_client = None event_loop = None # Will be set from main.py -meshtastic_lock = threading.Lock() # To prevent race conditions on meshtastic_client access +meshtastic_lock = ( + threading.Lock() +) # To prevent race conditions on meshtastic_client access reconnecting = False shutting_down = False reconnect_task = None # To keep track of the reconnect task + def serial_port_exists(port_name): """ Check if the specified serial port exists. @@ -41,6 +50,7 @@ def serial_port_exists(port_name): ports = [port.device for port in serial.tools.list_ports.comports()] return port_name in ports + def connect_meshtastic(force_connect=False): """ Establish a connection to the Meshtastic device. @@ -141,7 +151,9 @@ def connect_meshtastic(force_connect=False): break attempts += 1 if retry_limit == 0 or attempts <= retry_limit: - wait_time = min(attempts * 2, 30) # Exponential backoff capped at 30s + wait_time = min( + attempts * 2, 30 + ) # Exponential backoff capped at 30s logger.warning( f"Attempt #{attempts - 1} failed. Retrying in {wait_time} secs: {e}" ) @@ -152,6 +164,7 @@ def connect_meshtastic(force_connect=False): return meshtastic_client + def on_lost_meshtastic_connection(interface=None): """ Callback invoked when the Meshtastic connection is lost. @@ -186,6 +199,7 @@ def on_lost_meshtastic_connection(interface=None): if event_loop: reconnect_task = asyncio.run_coroutine_threadsafe(reconnect(), event_loop) + async def reconnect(): """ Asynchronously attempts to reconnect with exponential backoff. @@ -219,6 +233,7 @@ async def reconnect(): finally: reconnecting = False + def on_meshtastic_message(packet, interface): """ Handle incoming Meshtastic messages. For reaction messages, if relay_reactions is False, @@ -229,10 +244,12 @@ def on_meshtastic_message(packet, interface): relay_reactions = relay_config["meshtastic"].get("relay_reactions", False) # If relay_reactions is False, filter out reaction/tapback packets to avoid complexity - if packet.get('decoded', {}).get('portnum') == 'TEXT_MESSAGE_APP': - decoded = packet.get('decoded', {}) - if not relay_reactions and ('emoji' in decoded or 'replyId' in decoded): - logger.debug('Filtered out reaction/tapback packet due to relay_reactions=false.') + if packet.get("decoded", {}).get("portnum") == "TEXT_MESSAGE_APP": + decoded = packet.get("decoded", {}) + if not relay_reactions and ("emoji" in decoded or "replyId" in decoded): + logger.debug( + "Filtered out reaction/tapback packet due to relay_reactions=false." + ) return from matrix_utils import matrix_relay @@ -255,10 +272,11 @@ def on_meshtastic_message(packet, interface): decoded = packet.get("decoded", {}) text = decoded.get("text") replyId = decoded.get("replyId") - emoji_flag = 'emoji' in decoded and decoded['emoji'] == 1 + emoji_flag = "emoji" in decoded and decoded["emoji"] == 1 # Determine if this is a direct message to the relay node from meshtastic.mesh_interface import BROADCAST_NUM + myId = interface.myInfo.my_node_num if toId == myId: @@ -280,13 +298,17 @@ def on_meshtastic_message(packet, interface): if orig: # orig = (matrix_event_id, matrix_room_id, meshtastic_text, meshtastic_meshnet) matrix_event_id, matrix_room_id, meshtastic_text, meshtastic_meshnet = orig - abbreviated_text = meshtastic_text[:40] + "..." if len(meshtastic_text) > 40 else meshtastic_text + abbreviated_text = ( + meshtastic_text[:40] + "..." + if len(meshtastic_text) > 40 + else meshtastic_text + ) # Ensure that meshnet_name is always included, using our own meshnet for accuracy. full_display_name = f"{longname}/{meshnet_name}" - reaction_symbol = text.strip() if (text and text.strip()) else '⚠️' - reaction_message = f"\n [{full_display_name}] reacted {reaction_symbol} to \"{abbreviated_text}\"" + reaction_symbol = text.strip() if (text and text.strip()) else "⚠️" + reaction_message = f'\n [{full_display_name}] reacted {reaction_symbol} to "{abbreviated_text}"' # Relay the reaction as emote to Matrix, preserving the original meshnet name asyncio.run_coroutine_threadsafe( @@ -301,7 +323,7 @@ def on_meshtastic_message(packet, interface): meshtastic_replyId=replyId, meshtastic_text=meshtastic_text, emote=True, - emoji=True + emoji=True, ), loop=loop, ) @@ -340,7 +362,9 @@ def on_meshtastic_message(packet, interface): return # If detection_sensor is disabled and this is a detection sensor packet, skip it - if decoded.get("portnum") == "DETECTION_SENSOR_APP" and not relay_config["meshtastic"].get("detection_sensor", False): + if decoded.get("portnum") == "DETECTION_SENSOR_APP" and not relay_config[ + "meshtastic" + ].get("detection_sensor", False): logger.debug( "Detection sensor packet received, but detection sensor processing is disabled." ) @@ -376,6 +400,7 @@ def on_meshtastic_message(packet, interface): # Plugin functionality - Check if any plugin handles this message before relaying from plugin_loader import load_plugins + plugins = load_plugins() found_matching_plugin = False @@ -419,7 +444,7 @@ def on_meshtastic_message(packet, interface): meshnet_name, decoded.get("portnum"), meshtastic_id=packet.get("id"), - meshtastic_text=text + meshtastic_text=text, ), loop=loop, ) @@ -427,6 +452,7 @@ def on_meshtastic_message(packet, interface): # Non-text messages via plugins portnum = decoded.get("portnum") from plugin_loader import load_plugins + plugins = load_plugins() found_matching_plugin = False for plugin in plugins: @@ -446,6 +472,7 @@ def on_meshtastic_message(packet, interface): f"Processed {portnum} with plugin {plugin.plugin_name}" ) + async def check_connection(): """ Periodically checks the Meshtastic connection by sending a ping. @@ -462,6 +489,7 @@ async def check_connection(): on_lost_meshtastic_connection(meshtastic_client) await asyncio.sleep(5) # Check connection every 5 seconds + if __name__ == "__main__": # If running this standalone (normally the main.py does the loop), just try connecting and run forever. meshtastic_client = connect_meshtastic()