-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathmatrix_utils.py
611 lines (528 loc) · 24.3 KB
/
matrix_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
import asyncio
import io
import re
import ssl
import time
from typing import List, Union
import certifi
import meshtastic.protobuf.portnums_pb2
from nio import (
AsyncClient,
AsyncClientConfig,
MatrixRoom,
ReactionEvent,
RoomMessageEmote,
RoomMessageNotice,
RoomMessageText,
UploadResponse,
WhoamiError,
)
from PIL import Image
from config import relay_config
from db_utils import (
get_message_map_by_matrix_event_id,
prune_message_map,
store_message_map,
)
from log_utils import get_logger
# Do not import plugin_loader here to avoid circular imports
from meshtastic_utils import connect_meshtastic
# Extract Matrix configuration
matrix_homeserver = relay_config["matrix"]["homeserver"]
matrix_rooms: List[dict] = relay_config["matrix_rooms"]
matrix_access_token = relay_config["matrix"]["access_token"]
bot_user_id = relay_config["matrix"]["bot_user_id"]
bot_user_name = None # Detected upon logon
bot_start_time = int(
time.time() * 1000
) # Timestamp when the bot starts, used to filter out old messages
logger = get_logger(name="Matrix")
matrix_client = None
def bot_command(command, event):
"""
Checks if the given command is directed at the bot,
accounting for variations in different Matrix clients.
"""
full_message = event.body.strip()
content = event.source.get("content", {})
formatted_body = content.get("formatted_body", "")
# Remove HTML tags and extract the text content
text_content = re.sub(r"<[^>]+>", "", formatted_body).strip()
# Check if the message starts with bot_user_id or bot_user_name
if full_message.startswith(bot_user_id) or text_content.startswith(bot_user_id):
# Construct a regex pattern to match variations of bot mention and command
pattern = rf"^(?:{re.escape(bot_user_id)}|{re.escape(bot_user_name)}|[#@].+?)[,:;]?\s*!{command}$"
return bool(re.match(pattern, full_message)) or bool(
re.match(pattern, text_content)
)
elif full_message.startswith(bot_user_name) or text_content.startswith(
bot_user_name
):
# Construct a regex pattern to match variations of bot mention and command
pattern = rf"^(?:{re.escape(bot_user_id)}|{re.escape(bot_user_name)}|[#@].+?)[,:;]?\s*!{command}$"
return bool(re.match(pattern, full_message)) or bool(
re.match(pattern, text_content)
)
else:
return False
# # Construct a regex pattern to match variations of bot mention and command
# pattern = rf"^(?:{re.escape(bot_user_id)}|{re.escape(bot_user_name)}|[#@].+?)[,:;]?\s*!{command}$"
# # Check if the message matches the pattern
# return bool(re.match(pattern, full_message)) or bool(re.match(pattern, text_content))
async def connect_matrix():
"""
Establish a connection to the Matrix homeserver.
Sets global matrix_client and detects the bot's display name.
"""
global matrix_client
global bot_user_name
if matrix_client:
return matrix_client
# Create SSL context using certifi's certificates
ssl_context = ssl.create_default_context(cafile=certifi.where())
# Initialize the Matrix client with custom SSL context
config = AsyncClientConfig(encryption_enabled=False)
matrix_client = AsyncClient(
homeserver=matrix_homeserver,
user=bot_user_id,
config=config,
ssl=ssl_context,
)
# Set the access_token and user_id
matrix_client.access_token = matrix_access_token
matrix_client.user_id = bot_user_id
# Attempt to retrieve the device_id using whoami()
whoami_response = await matrix_client.whoami()
if isinstance(whoami_response, WhoamiError):
logger.error(f"Failed to retrieve device_id: {whoami_response.message}")
matrix_client.device_id = None
else:
matrix_client.device_id = whoami_response.device_id
if matrix_client.device_id:
logger.debug(f"Retrieved device_id: {matrix_client.device_id}")
else:
logger.warning("device_id not returned by whoami()")
# Fetch the bot's display name
response = await matrix_client.get_displayname(bot_user_id)
if hasattr(response, "displayname"):
bot_user_name = response.displayname
else:
bot_user_name = bot_user_id # Fallback if display name is not set
return matrix_client
async def join_matrix_room(matrix_client, room_id_or_alias: str) -> None:
"""Join a Matrix room by its ID or alias."""
try:
if room_id_or_alias.startswith("#"):
# If it's a room alias, resolve it to a room ID
response = await matrix_client.room_resolve_alias(room_id_or_alias)
if not response.room_id:
logger.error(
f"Failed to resolve room alias '{room_id_or_alias}': {response.message}"
)
return
room_id = response.room_id
# Update the room ID in the matrix_rooms list
for room_config in matrix_rooms:
if room_config["id"] == room_id_or_alias:
room_config["id"] = room_id
break
else:
room_id = room_id_or_alias
# Attempt to join the room if not already joined
if room_id not in matrix_client.rooms:
response = await matrix_client.join(room_id)
if response and hasattr(response, "room_id"):
logger.info(f"Joined room '{room_id_or_alias}' successfully")
else:
logger.error(
f"Failed to join room '{room_id_or_alias}': {response.message}"
)
else:
logger.debug(f"Bot is already in room '{room_id_or_alias}'")
except Exception as e:
logger.error(f"Error joining room '{room_id_or_alias}': {e}")
async def matrix_relay(
room_id,
message,
longname,
shortname,
meshnet_name,
portnum,
meshtastic_id=None,
meshtastic_replyId=None,
meshtastic_text=None,
emote=False,
emoji=False,
):
"""
Relay a message from Meshtastic to Matrix, optionally storing message maps.
IMPORTANT CHANGE: Now, we only store message maps if `relay_reactions` is True.
If `relay_reactions` is False, we skip storing to the message map entirely.
This helps maintain privacy and prevents message_map usage unless needed.
Additionally, if `msgs_to_keep` > 0, we prune the oldest messages after storing
to prevent database bloat and maintain privacy.
"""
matrix_client = await connect_matrix()
# Retrieve relay_reactions configuration; default to False now if not specified.
relay_reactions = relay_config["meshtastic"].get("relay_reactions", False)
# Retrieve db config for message_map pruning
db_config = relay_config.get("db", {})
msg_map_config = db_config.get("msg_map", {})
msgs_to_keep = msg_map_config.get(
"msgs_to_keep", 500
) # Default is 500 if not specified
try:
# Always use our own local meshnet_name for outgoing events
local_meshnet_name = relay_config["meshtastic"]["meshnet_name"]
content = {
"msgtype": "m.text" if not emote else "m.emote",
"body": message,
"meshtastic_longname": longname,
"meshtastic_shortname": shortname,
"meshtastic_meshnet": local_meshnet_name,
"meshtastic_portnum": portnum,
}
if meshtastic_id is not None:
content["meshtastic_id"] = meshtastic_id
if meshtastic_replyId is not None:
content["meshtastic_replyId"] = meshtastic_replyId
if meshtastic_text is not None:
content["meshtastic_text"] = meshtastic_text
if emoji:
content["meshtastic_emoji"] = 1
response = await asyncio.wait_for(
matrix_client.room_send(
room_id=room_id,
message_type="m.room.message",
content=content,
),
timeout=5.0,
)
logger.info(f"Sent inbound radio message to matrix room: {room_id}")
# Only store message map if relay_reactions is True and meshtastic_id is present and not an emote.
# If relay_reactions is False, we skip storing entirely.
if relay_reactions and meshtastic_id is not None and not emote:
# Store the message map
store_message_map(
meshtastic_id,
response.event_id,
room_id,
meshtastic_text if meshtastic_text else message,
meshtastic_meshnet=local_meshnet_name,
)
# If msgs_to_keep > 0, prune old messages after inserting a new one
if msgs_to_keep > 0:
prune_message_map(msgs_to_keep)
except asyncio.TimeoutError:
logger.error("Timed out while waiting for Matrix response")
except Exception as e:
logger.error(f"Error sending radio message to matrix room {room_id}: {e}")
def truncate_message(text, max_bytes=227):
"""
Truncate the given text to fit within the specified byte size.
:param text: The text to truncate.
:param max_bytes: The maximum allowed byte size for the truncated text.
:return: The truncated text.
"""
truncated_text = text.encode("utf-8")[:max_bytes].decode("utf-8", "ignore")
return truncated_text
# Callback for new messages in Matrix room
async def on_room_message(
room: MatrixRoom,
event: Union[RoomMessageText, RoomMessageNotice, ReactionEvent, RoomMessageEmote],
) -> None:
"""
Handle new messages and reactions in Matrix. For reactions, we ensure that when relaying back
to Meshtastic, we always apply our local meshnet_name to outgoing events.
We must be careful not to relay reactions to reactions (reaction-chains),
especially remote reactions that got relayed into the room as m.emote events,
as we do not store them in the database. If we can't find the original message in the DB,
it likely means it's a reaction to a reaction, and we stop there.
Additionally, we only deal with message_map storage (and thus reaction linking)
if relay_reactions is True. If it's False, none of these mappings are stored or used.
"""
# Importing here to avoid circular imports and to keep logic consistent
# Note: We do not call store_message_map directly here for inbound matrix->mesh messages.
# That logic occurs inside matrix_relay if needed.
full_display_name = "Unknown user"
message_timestamp = event.server_timestamp
# We do not relay messages that occurred before the bot started
if message_timestamp < bot_start_time:
return
# Find the room_config that matches this room, if any
room_config = None
for config in matrix_rooms:
if config["id"] == room.room_id:
room_config = config
break
# Only proceed if the room is supported
if not room_config:
return
relates_to = event.source["content"].get("m.relates_to")
is_reaction = False
reaction_emoji = None
original_matrix_event_id = None
# Retrieve relay_reactions option from config, now defaulting to False
relay_reactions = relay_config["meshtastic"].get("relay_reactions", False)
# Check if this is a Matrix ReactionEvent (usually m.reaction)
if isinstance(event, ReactionEvent):
# This is a reaction event
is_reaction = True
logger.debug(f"Processing Matrix reaction event: {event.source}")
if relates_to and "event_id" in relates_to and "key" in relates_to:
# Extract the reaction emoji and the original event it relates to
reaction_emoji = relates_to["key"]
original_matrix_event_id = relates_to["event_id"]
logger.debug(
f"Original matrix event ID: {original_matrix_event_id}, Reaction emoji: {reaction_emoji}"
)
# Check if this is a Matrix RoomMessageEmote (m.emote)
if isinstance(event, RoomMessageEmote):
logger.debug(f"Processing Matrix reaction event: {event.source}")
# For RoomMessageEmote, treat as remote reaction if meshtastic_replyId exists
is_reaction = True
# We need to manually extract the reaction emoji from the body
reaction_body = event.source["content"].get("body", "")
reaction_match = re.search(r"reacted (.+?) to", reaction_body)
reaction_emoji = reaction_match.group(1).strip() if reaction_match else "?"
text = event.body.strip() if (not is_reaction and hasattr(event, "body")) else ""
longname = event.source["content"].get("meshtastic_longname")
shortname = event.source["content"].get("meshtastic_shortname", None)
meshnet_name = event.source["content"].get("meshtastic_meshnet")
meshtastic_replyId = event.source["content"].get("meshtastic_replyId")
suppress = event.source["content"].get("mmrelay_suppress")
# If a message has suppress flag, do not process
if suppress:
return
# If this is a reaction and relay_reactions is False, do nothing
if is_reaction and not relay_reactions:
logger.debug(
"Reaction event encountered but relay_reactions is disabled. Doing nothing."
)
return
local_meshnet_name = relay_config["meshtastic"]["meshnet_name"]
# If this is a reaction and relay_reactions is True, attempt to relay it
if is_reaction and relay_reactions:
# Check if we need to relay a reaction from a remote meshnet to our local meshnet.
# If meshnet_name != local_meshnet_name and meshtastic_replyId is present and this is an emote,
# it's a remote reaction that needs to be forwarded as a text message describing the reaction.
if (
meshnet_name
and meshnet_name != local_meshnet_name
and meshtastic_replyId
and isinstance(event, RoomMessageEmote)
):
logger.info(f"Relaying reaction from remote meshnet: {meshnet_name}")
short_meshnet_name = meshnet_name[:4]
# Format the reaction message for relaying to the local meshnet.
# The necessary information is in the m.emote event
if not shortname:
shortname = longname[:3] if longname else "???"
# Use meshtastic_text from content if available
meshtastic_text_db = event.source["content"].get("meshtastic_text", "")
meshtastic_text_db = meshtastic_text_db.replace("\n", " ").replace(
"\r", " "
)
abbreviated_text = (
meshtastic_text_db[:40] + "..."
if len(meshtastic_text_db) > 40
else meshtastic_text_db
)
reaction_message = f'{shortname}/{short_meshnet_name} reacted {reaction_emoji} to "{abbreviated_text}"'
# Relay the remote reaction to the local meshnet.
meshtastic_interface = connect_meshtastic()
from meshtastic_utils import logger as meshtastic_logger
meshtastic_channel = room_config["meshtastic_channel"]
if relay_config["meshtastic"]["broadcast_enabled"]:
meshtastic_logger.info(
f"Relaying reaction from remote meshnet {meshnet_name} to radio broadcast"
)
logger.debug(
f"Sending reaction to Meshtastic with meshnet={local_meshnet_name}: {reaction_message}"
)
meshtastic_interface.sendText(
text=reaction_message, channelIndex=meshtastic_channel
)
# We've relayed the remote reaction to our local mesh, so we're done.
return
# If original_matrix_event_id is set, this is a reaction to some other matrix event
if original_matrix_event_id:
orig = get_message_map_by_matrix_event_id(original_matrix_event_id)
if not orig:
# If we don't find the original message in the DB, we suspect it's a reaction-to-reaction scenario
logger.debug(
"Original message for reaction not found in DB. Possibly a reaction-to-reaction scenario. Not forwarding."
)
return
# orig = (meshtastic_id, matrix_room_id, meshtastic_text, meshtastic_meshnet)
meshtastic_id, matrix_room_id, meshtastic_text_db, meshtastic_meshnet_db = (
orig
)
display_name_response = await matrix_client.get_displayname(event.sender)
full_display_name = display_name_response.displayname or event.sender
# If not from a remote meshnet, proceed as normal to relay back to the originating meshnet
short_display_name = full_display_name[:5]
prefix = f"{short_display_name}[M]: "
abbreviated_text = (
meshtastic_text_db[:40] + "..."
if len(meshtastic_text_db) > 40
else meshtastic_text_db
)
# Always use our local meshnet_name for outgoing events
reaction_message = (
f'{prefix}reacted {reaction_emoji} to "{abbreviated_text}"'
)
meshtastic_interface = connect_meshtastic()
from meshtastic_utils import logger as meshtastic_logger
meshtastic_channel = room_config["meshtastic_channel"]
if relay_config["meshtastic"]["broadcast_enabled"]:
meshtastic_logger.info(
f"Relaying reaction from {full_display_name} to radio broadcast"
)
logger.debug(
f"Sending reaction to Meshtastic with meshnet={local_meshnet_name}: {reaction_message}"
)
meshtastic_interface.sendText(
text=reaction_message, channelIndex=meshtastic_channel
)
return
# For Matrix->Mesh messages from a remote meshnet, rewrite the message format
if longname and meshnet_name:
# Always include the meshnet_name in the full display name.
full_display_name = f"{longname}/{meshnet_name}"
if meshnet_name != local_meshnet_name:
# A message from a remote meshnet relayed into Matrix, now going back out
logger.info(f"Processing message from remote meshnet: {meshnet_name}")
short_meshnet_name = meshnet_name[:4]
# If shortname is not available, derive it from the longname
if shortname is None:
shortname = longname[:3] if longname else "???"
# Remove the original prefix "[longname/meshnet]: " to avoid double-tagging
text = re.sub(rf"^\[{full_display_name}\]: ", "", text)
text = truncate_message(text)
full_message = f"{shortname}/{short_meshnet_name}: {text}"
else:
# If this message is from our local meshnet (loopback), we ignore it
return
else:
# Normal Matrix message from a Matrix user
display_name_response = await matrix_client.get_displayname(event.sender)
full_display_name = display_name_response.displayname or event.sender
short_display_name = full_display_name[:5]
prefix = f"{short_display_name}[M]: "
logger.debug(f"Processing matrix message from [{full_display_name}]: {text}")
full_message = f"{prefix}{text}"
text = truncate_message(text)
# Plugin functionality
from plugin_loader import load_plugins
plugins = load_plugins()
found_matching_plugin = False
for plugin in plugins:
if not found_matching_plugin:
found_matching_plugin = await plugin.handle_room_message(
room, event, full_message
)
if found_matching_plugin:
logger.debug(f"Processed by plugin {plugin.plugin_name}")
# Check if the message is a command directed at the bot
is_command = False
for plugin in plugins:
for command in plugin.get_matrix_commands():
if bot_command(command, event):
is_command = True
break
if is_command:
break
# If this is a command, we do not send it to the mesh
if is_command:
logger.debug("Message is a command, not sending to mesh")
return
# Connect to Meshtastic
meshtastic_interface = connect_meshtastic()
from meshtastic_utils import logger as meshtastic_logger
meshtastic_channel = room_config["meshtastic_channel"]
# If message is from Matrix and broadcast_enabled is True, relay to Meshtastic
# Note: If relay_reactions is False, we won't store message_map, but we can still relay.
# The lack of message_map storage just means no reaction bridging will occur.
if not found_matching_plugin and event.sender != bot_user_id:
if relay_config["meshtastic"]["broadcast_enabled"]:
portnum = event.source["content"].get("meshtastic_portnum")
if portnum == "DETECTION_SENSOR_APP":
# If detection_sensor is enabled, forward this data as detection sensor data
if relay_config["meshtastic"].get("detection_sensor", False):
sent_packet = meshtastic_interface.sendData(
data=full_message.encode("utf-8"),
channelIndex=meshtastic_channel,
portNum=meshtastic.protobuf.portnums_pb2.PortNum.DETECTION_SENSOR_APP,
)
# If relay_reactions is True, we store the message map for these messages as well.
# If False, skip storing.
if relay_reactions and sent_packet and hasattr(sent_packet, "id"):
store_message_map(
sent_packet.id,
event.event_id,
room.room_id,
text,
meshtastic_meshnet=local_meshnet_name,
)
db_config = relay_config.get("db", {})
msg_map_config = db_config.get("msg_map", {})
msgs_to_keep = msg_map_config.get("msgs_to_keep", 500)
if msgs_to_keep > 0:
prune_message_map(msgs_to_keep)
else:
meshtastic_logger.debug(
f"Detection sensor packet received from {full_display_name}, but detection sensor processing is disabled."
)
else:
meshtastic_logger.info(
f"Relaying message from {full_display_name} to radio broadcast"
)
sent_packet = meshtastic_interface.sendText(
text=full_message, channelIndex=meshtastic_channel
)
# Store message_map only if relay_reactions is True
if relay_reactions and sent_packet and hasattr(sent_packet, "id"):
store_message_map(
sent_packet.id,
event.event_id,
room.room_id,
text,
meshtastic_meshnet=local_meshnet_name,
)
db_config = relay_config.get("db", {})
msg_map_config = db_config.get("msg_map", {})
msgs_to_keep = msg_map_config.get("msgs_to_keep", 500)
if msgs_to_keep > 0:
prune_message_map(msgs_to_keep)
else:
logger.debug(
f"Broadcast not supported: Message from {full_display_name} dropped."
)
async def upload_image(
client: AsyncClient, image: Image.Image, filename: str
) -> UploadResponse:
"""
Uploads an image to Matrix and returns the UploadResponse containing the content URI.
"""
buffer = io.BytesIO()
image.save(buffer, format="PNG")
image_data = buffer.getvalue()
response, maybe_keys = await client.upload(
io.BytesIO(image_data),
content_type="image/png",
filename=filename,
filesize=len(image_data),
)
return response
async def send_room_image(
client: AsyncClient, room_id: str, upload_response: UploadResponse
):
"""
Sends an already uploaded image to the specified room.
"""
await client.room_send(
room_id=room_id,
message_type="m.room.message",
content={"msgtype": "m.image", "url": upload_response.content_uri, "body": ""},
)