Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 39 additions & 40 deletions gateway/platforms/whatsapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from typing import Dict, Optional, Any

from hermes_cli.config import get_hermes_home
from hermes_constants import get_hermes_dir

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -135,7 +134,7 @@ def __init__(self, config: PlatformConfig):
)
self._session_path: Path = Path(config.extra.get(
"session_path",
get_hermes_dir("platforms/whatsapp/session", "whatsapp/session")
get_hermes_home() / "whatsapp" / "session"
))
self._reply_prefix: Optional[str] = config.extra.get("reply_prefix")
self._message_queue: asyncio.Queue = asyncio.Queue()
Expand Down Expand Up @@ -163,7 +162,7 @@ async def connect(self) -> bool:
# Auto-install npm dependencies if node_modules doesn't exist
bridge_dir = bridge_path.parent
if not (bridge_dir / "node_modules").exists():
print(f"[{self.name}] Installing WhatsApp bridge dependencies...")
logger.debug("[%s] Installing WhatsApp bridge dependencies...", self.name)
try:
install_result = subprocess.run(
["npm", "install", "--silent"],
Expand All @@ -173,11 +172,11 @@ async def connect(self) -> bool:
timeout=60,
)
if install_result.returncode != 0:
print(f"[{self.name}] npm install failed: {install_result.stderr}")
logger.warning("[%s] npm install failed: %s", self.name, install_result.stderr)
return False
print(f"[{self.name}] Dependencies installed")
logger.debug("[%s] Dependencies installed", self.name)
except Exception as e:
print(f"[{self.name}] Failed to install dependencies: {e}")
logger.warning("[%s] Failed to install dependencies: %s", self.name, e)
return False

try:
Expand All @@ -197,13 +196,13 @@ async def connect(self) -> bool:
data = await resp.json()
bridge_status = data.get("status", "unknown")
if bridge_status == "connected":
print(f"[{self.name}] Using existing bridge (status: {bridge_status})")
logger.debug("[%s] Using existing bridge (status: %s)", self.name, bridge_status)
self._mark_connected()
self._bridge_process = None # Not managed by us
self._poll_task = asyncio.create_task(self._poll_messages())
return True
else:
print(f"[{self.name}] Bridge found but not connected (status: {bridge_status}), restarting")
logger.warning("[%s] Bridge found but not connected (status: %s), restarting", self.name, bridge_status)
except Exception:
pass # Bridge not running, start a new one

Expand Down Expand Up @@ -249,8 +248,8 @@ async def connect(self) -> bool:
for attempt in range(15):
await asyncio.sleep(1)
if self._bridge_process.poll() is not None:
print(f"[{self.name}] Bridge process died (exit code {self._bridge_process.returncode})")
print(f"[{self.name}] Check log: {self._bridge_log}")
logger.warning("[%s] Bridge process died (exit code %s)", self.name, self._bridge_process.returncode)
logger.debug("[%s] Check log: %s", self.name, self._bridge_log)
self._close_bridge_log()
return False
try:
Expand All @@ -263,26 +262,26 @@ async def connect(self) -> bool:
http_ready = True
data = await resp.json()
if data.get("status") == "connected":
print(f"[{self.name}] Bridge ready (status: connected)")
logger.debug("[%s] Bridge ready (status: connected)", self.name)
break
except Exception:
continue

if not http_ready:
print(f"[{self.name}] Bridge HTTP server did not start in 15s")
print(f"[{self.name}] Check log: {self._bridge_log}")
logger.warning("[%s] Bridge HTTP server did not start in 15s", self.name)
logger.debug("[%s] Check log: %s", self.name, self._bridge_log)
self._close_bridge_log()
return False

# Phase 2: HTTP is up but WhatsApp may still be connecting.
# Give it more time to authenticate with saved credentials.
if data.get("status") != "connected":
print(f"[{self.name}] Bridge HTTP ready, waiting for WhatsApp connection...")
logger.debug("[%s] Bridge HTTP ready, waiting for WhatsApp connection...", self.name)
for attempt in range(15):
await asyncio.sleep(1)
if self._bridge_process.poll() is not None:
print(f"[{self.name}] Bridge process died during connection")
print(f"[{self.name}] Check log: {self._bridge_log}")
logger.warning("[%s] Bridge process died during connection", self.name)
logger.debug("[%s] Check log: %s", self.name, self._bridge_log)
self._close_bridge_log()
return False
try:
Expand All @@ -294,22 +293,22 @@ async def connect(self) -> bool:
if resp.status == 200:
data = await resp.json()
if data.get("status") == "connected":
print(f"[{self.name}] Bridge ready (status: connected)")
logger.debug("[%s] Bridge ready (status: connected)", self.name)
break
except Exception:
continue
else:
# Still not connected β€” warn but proceed (bridge may
# auto-reconnect later, e.g. after a code 515 restart).
print(f"[{self.name}] ⚠ WhatsApp not connected after 30s")
print(f"[{self.name}] Bridge log: {self._bridge_log}")
print(f"[{self.name}] If session expired, re-pair: hermes whatsapp")
logger.warning("[%s] ⚠ WhatsApp not connected after 30s", self.name)
logger.debug("[%s] Bridge log: %s", self.name, self._bridge_log)
logger.debug("[%s] If session expired, re-pair: hermes whatsapp", self.name)

# Start message polling task
self._poll_task = asyncio.create_task(self._poll_messages())

self._mark_connected()
print(f"[{self.name}] Bridge started on port {self._bridge_port}")
logger.debug("[%s] Bridge started on port %s", self.name, self._bridge_port)
return True

except Exception as e:
Expand Down Expand Up @@ -366,15 +365,15 @@ async def disconnect(self) -> None:
except (ProcessLookupError, PermissionError):
self._bridge_process.kill()
except Exception as e:
print(f"[{self.name}] Error stopping bridge: {e}")
logger.warning("[%s] Error stopping bridge: %s", self.name, e)
else:
# Bridge was not started by us, don't kill it
print(f"[{self.name}] Disconnecting (external bridge left running)")
logger.debug("[%s] Disconnecting (external bridge left running)", self.name)

self._mark_disconnected()
self._bridge_process = None
self._close_bridge_log()
print(f"[{self.name}] Disconnected")
logger.debug("[%s] Disconnected", self.name)

async def send(
self,
Expand Down Expand Up @@ -609,13 +608,13 @@ async def _poll_messages(self) -> None:
try:
import aiohttp
except ImportError:
print(f"[{self.name}] aiohttp not installed, message polling disabled")
logger.debug("[%s] aiohttp not installed, message polling disabled", self.name)
return

while self._running:
bridge_exit = await self._check_managed_bridge_exit()
if bridge_exit:
print(f"[{self.name}] {bridge_exit}")
logger.debug("[%s] %s", self.name, bridge_exit)
break
try:
async with aiohttp.ClientSession() as session:
Expand All @@ -634,9 +633,9 @@ async def _poll_messages(self) -> None:
except Exception as e:
bridge_exit = await self._check_managed_bridge_exit()
if bridge_exit:
print(f"[{self.name}] {bridge_exit}")
logger.debug("[%s] %s", self.name, bridge_exit)
break
print(f"[{self.name}] Poll error: {e}")
logger.warning("[%s] Poll error: %s", self.name, e)
await asyncio.sleep(5)

await asyncio.sleep(1) # Poll interval
Expand Down Expand Up @@ -681,42 +680,42 @@ async def _build_message_event(self, data: Dict[str, Any]) -> Optional[MessageEv
cached_path = await cache_image_from_url(url, ext=".jpg")
cached_urls.append(cached_path)
media_types.append("image/jpeg")
print(f"[{self.name}] Cached user image: {cached_path}", flush=True)
logger.debug("[%s] Cached user image: %s", self.name, cached_path)
except Exception as e:
print(f"[{self.name}] Failed to cache image: {e}", flush=True)
logger.warning("[%s] Failed to cache image: %s", self.name, e)
cached_urls.append(url)
media_types.append("image/jpeg")
elif msg_type == MessageType.PHOTO and os.path.isabs(url):
# Local file path β€” bridge already downloaded the image
cached_urls.append(url)
media_types.append("image/jpeg")
print(f"[{self.name}] Using bridge-cached image: {url}", flush=True)
logger.debug("[%s] Using bridge-cached image: %s", self.name, url)
elif msg_type == MessageType.VOICE and url.startswith(("http://", "https://")):
try:
cached_path = await cache_audio_from_url(url, ext=".ogg")
cached_urls.append(cached_path)
media_types.append("audio/ogg")
print(f"[{self.name}] Cached user voice: {cached_path}", flush=True)
logger.debug("[%s] Cached user voice: %s", self.name, cached_path)
except Exception as e:
print(f"[{self.name}] Failed to cache voice: {e}", flush=True)
logger.warning("[%s] Failed to cache voice: %s", self.name, e)
cached_urls.append(url)
media_types.append("audio/ogg")
elif msg_type == MessageType.VOICE and os.path.isabs(url):
# Local file path β€” bridge already downloaded the audio
cached_urls.append(url)
media_types.append("audio/ogg")
print(f"[{self.name}] Using bridge-cached audio: {url}", flush=True)
logger.debug("[%s] Using bridge-cached audio: %s", self.name, url)
elif msg_type == MessageType.DOCUMENT and os.path.isabs(url):
# Local file path β€” bridge already downloaded the document
cached_urls.append(url)
ext = Path(url).suffix.lower()
mime = SUPPORTED_DOCUMENT_TYPES.get(ext, "application/octet-stream")
media_types.append(mime)
print(f"[{self.name}] Using bridge-cached document: {url}", flush=True)
logger.debug("[%s] Using bridge-cached document: %s", self.name, url)
elif msg_type == MessageType.VIDEO and os.path.isabs(url):
cached_urls.append(url)
media_types.append("video/mp4")
print(f"[{self.name}] Using bridge-cached video: {url}", flush=True)
logger.debug("[%s] Using bridge-cached video: %s", self.name, url)
else:
cached_urls.append(url)
media_types.append("unknown")
Expand All @@ -733,7 +732,7 @@ async def _build_message_event(self, data: Dict[str, Any]) -> Optional[MessageEv
try:
file_size = Path(doc_path).stat().st_size
if file_size > MAX_TEXT_INJECT_BYTES:
print(f"[{self.name}] Skipping text injection for {doc_path} ({file_size} bytes > {MAX_TEXT_INJECT_BYTES})", flush=True)
logger.debug("[%s] Skipping text injection for %s (%s bytes > %s)", self.name, doc_path, file_size, MAX_TEXT_INJECT_BYTES)
continue
content = Path(doc_path).read_text(errors="replace")
fname = Path(doc_path).name
Expand All @@ -748,9 +747,9 @@ async def _build_message_event(self, data: Dict[str, Any]) -> Optional[MessageEv
body = f"{injection}\n\n{body}"
else:
body = injection
print(f"[{self.name}] Injected text content from: {doc_path}", flush=True)
logger.debug("[%s] Injected text content from: %s", self.name, doc_path)
except Exception as e:
print(f"[{self.name}] Failed to read document text: {e}", flush=True)
logger.warning("[%s] Failed to read document text: %s", self.name, e)

return MessageEvent(
text=body,
Expand All @@ -762,5 +761,5 @@ async def _build_message_event(self, data: Dict[str, Any]) -> Optional[MessageEv
media_types=media_types,
)
except Exception as e:
print(f"[{self.name}] Error building event: {e}")
logger.warning("[%s] Error building event: %s", self.name, e)
return None
Loading