Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 76 additions & 81 deletions custom_components/meshtastic/aiomeshtastic/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import functools
import itertools
import random
import ssl
from collections import defaultdict, deque
from collections.abc import AsyncIterator, Awaitable, Callable, Mapping, MutableMapping
from dataclasses import dataclass
Expand All @@ -16,15 +17,12 @@
Self,
)

from homeassistant.util.ssl import get_default_context

try:
import aiomqtt
from aiomqtt import MqttError
import paho.mqtt.client as mqtt

_has_aiomqtt = True
_has_mqtt = True
except ImportError:
_has_aiomqtt = False
_has_mqtt = False
import google
from google.protobuf.message import Message

Expand Down Expand Up @@ -168,15 +166,14 @@ def __init__( # noqa: PLR0913

# MQTT client for persistent connection
self._mqtt_proxy_enabled = enable_mqtt_proxy
if self._mqtt_proxy_enabled and not _has_aiomqtt:
self._logger.warning("Could not enable MQTT proxy because aiomqtt is not installed")
if self._mqtt_proxy_enabled and not _has_mqtt:
self._logger.warning("Could not enable MQTT proxy because paho-mqtt is not installed")
self._mqtt_proxy_enabled = False

if self._mqtt_proxy_enabled:
self._mqtt_client: aiomqtt.Client | None = None
self._mqtt_connected = False
self._mqtt_connection_task: asyncio.Task | None = None
self._mqtt_config: dict[str, str] | None = None
self._mqtt_client: mqtt.Client | None = None
self._mqtt_message_queue = deque()
self._mqtt_retry_task = None

def add_packet_app_listener(
self,
Expand Down Expand Up @@ -346,9 +343,12 @@ async def stop(self) -> None:
await self._cancel_processing_tasks()
await self._cancel_background_tasks()

# MQTT client will be closed automatically when the context manager exits
self._mqtt_client = None
self._mqtt_connected = False
# Clean up MQTT client
if self._mqtt_client is not None:
self._logger.debug("Stopping MQTT client")
self._mqtt_client.disconnect()
self._mqtt_client.loop_stop()
self._mqtt_client = None

with contextlib.suppress(Exception):
await self._connection.send_disconnect()
Expand Down Expand Up @@ -397,6 +397,10 @@ async def connected_node_ready(self) -> bool:

async def _init_mqtt_client(self) -> None:
"""Initialize the MQTT client if MQTT is enabled in the module config."""
if self._mqtt_client is not None:
self._logger.debug("MQTT client already initialized, skipping.")
return

if not await self.connected_node_ready():
self._logger.debug("Node not ready, not initializing MQTT client")
return
Expand All @@ -406,103 +410,94 @@ async def _init_mqtt_client(self) -> None:
self._logger.debug("MQTT not enabled in module config, not initializing client")
return

# Get MQTT configuration
broker = mqtt_config.address or "mqtt.meshtastic.org"
broker = mqtt_config.address
username = mqtt_config.username
password = mqtt_config.password
use_tls = mqtt_config.tls_enabled

# Parse broker address
hostname = broker.split(":", 1)[0]
port = int(broker.split(":", 1)[1]) if ":" in broker else 1883

# Get node ID for client identifier
node_id = self._connected_node_info.my_node_num
client_id = f"!{node_id:08x}"
client_id = f"!{node_id:08x}-ha"

self._logger.info("Initializing MQTT client")

# Create MQTT client configuration
self._mqtt_config = {
"hostname": hostname,
"port": port,
"username": username or None,
"password": password or None,
"tls_context": get_default_context() if use_tls else None,
"identifier": client_id,
}

# Start connection task
self._mqtt_connection_task = self._add_background_task(self._maintain_mqtt_connection(), name="mqtt-connection")

async def _maintain_mqtt_connection(self) -> None:
"""Maintains the MQTT connection and handles reconnections."""
while self.is_running:
try:
self._logger.debug("Connecting to MQTT broker")
self._mqtt_client = mqtt.Client(client_id=client_id)

self._mqtt_client = aiomqtt.Client(**self._mqtt_config)
self._mqtt_client.on_connect = self._on_mqtt_connect
self._mqtt_client.on_disconnect = self._on_mqtt_disconnect

# When the context manager exits, the connection is closed
async with self._mqtt_client:
self._mqtt_connected = True
self._logger.debug("Connected to MQTT broker")
if use_tls:
self._mqtt_client.tls_set(
ca_certs=None, # Use default CA certs
certfile=None,
keyfile=None,
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS,
ciphers=None,
)

# Wait until the interface is stopped
await self._is_stopped.wait()
if username and password:
self._mqtt_client.username_pw_set(username, password)

# Interface stopped, don't reconnect
break
except MqttError as e:
self._logger.warning("Meshtastic MQTT proxy connection error: %s", e)
finally:
self._mqtt_connected = False
self._logger.debug("MQTT connection closed")
self._logger.debug("Connecting to MQTT broker %s:%d", hostname, port)
self._mqtt_client.connect_async(hostname, port)

# Wait before attempting to reconnect
self._logger.debug("Reconnecting MQTT in 5 seconds")
try:
await asyncio.sleep(5)
except asyncio.CancelledError:
break
self._mqtt_client.loop_start()

def _on_mqtt_connect(self, _client: mqtt.Client, _userdata: Any, _flags: dict[str, Any], rc: int) -> None:
if rc == 0:
self._logger.info("Connected to MQTT broker")
else:
self._logger.warning("Failed to connect to MQTT broker, return code: %d", rc)

def _on_mqtt_disconnect(self, _client: mqtt.Client, _userdata: Any, rc: int) -> None:
self._logger.info("Disconnected from MQTT broker, return code: %d", rc)

async def _handle_mqtt_client_proxy_message(self, message: mesh_pb2.MqttClientProxyMessage) -> None:
"""
Handle MQTT client proxy messages from the radio.

This receives MqttClientProxyMessage messages from the radio and forwards them to the
configured MQTT broker.
configured MQTT broker. Will retry up to 3 times on failure.
"""
if (
not hasattr(self._connected_node_module_config, "mqtt")
or not self._connected_node_module_config.mqtt.enabled
or not self._connected_node_module_config.mqtt.proxy_to_client_enabled
):
return

if not self._mqtt_connected or self._mqtt_client is None:
self._logger.debug("MQTT client not yet connected")
if self._mqtt_client is None:
self._logger.debug("No MQTT client available, message will be lost")
return

self._logger.debug("Publishing MQTT message")
self._logger.debug("Publishing MQTT message to topic: {message.topic}")

try:
if message.HasField("data"):
await self._mqtt_client.publish(
message.topic,
payload=message.data,
retain=message.retained,
qos=1,
)
payload = message.data
elif message.HasField("text"):
await self._mqtt_client.publish(
message.topic,
payload=message.text.encode("utf-8"),
retain=message.retained,
qos=1,
)
payload = message.text.encode("utf-8")
else:
self._logger.debug("No payload in MQTT message, ignoring")
return

max_retries = 3
retry_count = 0

while retry_count < max_retries:
result = self._mqtt_client.publish(message.topic, payload=payload, qos=2, retain=message.retained)

if result.rc == mqtt.MQTT_ERR_SUCCESS:
return

self._logger.warning(
"Error publishing MQTT message: %s, attempt %d of %d",
mqtt.error_string(result.rc),
retry_count + 1,
max_retries,
)

retry_count += 1
if retry_count < max_retries:
await asyncio.sleep(1)

self._logger.error("Failed to publish MQTT message after %d attempts", max_retries)
except Exception:
self._logger.exception("Error publishing MQTT message")

Expand Down
2 changes: 1 addition & 1 deletion custom_components/meshtastic/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"issue_tracker": "https://github.com/meshtastic/home-assistant/issues",
"requirements": [
"pyserial-asyncio==0.6",
"aiomqtt"
"paho-mqtt>=2.1.0"
],
"usb": [
{
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ ruff==0.11.2
bleak~=0.22.3
pyserial-asyncio~=0.6

aiomqtt>=1.2.0
paho-mqtt>=2.1.0