diff --git a/Dockerfile b/Dockerfile index 689eb70..b1f9495 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,7 +6,7 @@ RUN apt-get update \ WORKDIR /app COPY . . -RUN pip install --no-cache-dir -e ".[mcp]" +RUN pip install --no-cache-dir -e ".[mcp,telegram]" # Persistent state volume (Railway mounts here) RUN mkdir -p /data diff --git a/cli/commands/telegram_cmd.py b/cli/commands/telegram_cmd.py new file mode 100644 index 0000000..9530707 --- /dev/null +++ b/cli/commands/telegram_cmd.py @@ -0,0 +1,58 @@ +"""hl telegram — start the Telegram bot interface.""" +from __future__ import annotations + +import logging +import sys +from pathlib import Path + +import typer + +telegram_app = typer.Typer() + + +@telegram_app.command("start") +def telegram_start( + mainnet: bool = typer.Option( + False, "--mainnet", + help="Connect to mainnet (default: testnet)", + ), + dry_run: bool = typer.Option( + False, "--dry-run", + help="Agents run in dry-run mode (no real orders)", + ), +): + """Start the Telegram bot for deploying and controlling trading agents.""" + project_root = str(Path(__file__).resolve().parent.parent.parent) + if project_root not in sys.path: + sys.path.insert(0, project_root) + + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(name)-14s %(levelname)-5s %(message)s", + datefmt="%H:%M:%S", + ) + + from tg_bot.config import TelegramBotConfig + from tg_bot.bot import run_bot + + config = TelegramBotConfig.from_env() + + if mainnet: + config.default_network = "mainnet" + + if not config.bot_token: + typer.echo( + "ERROR: TELEGRAM_BOT_TOKEN not set.\n" + "1. Create a bot via @BotFather on Telegram\n" + "2. Set TELEGRAM_BOT_TOKEN= in your environment\n" + "3. Run this command again", + err=True, + ) + raise typer.Exit(code=1) + + typer.echo(f"Network: {config.default_network}") + typer.echo(f"Chat IDs: {config.allowed_chat_ids or 'auto-detect on first /start'}") + typer.echo("Bot starting... (Ctrl+C to stop)") + typer.echo("") + + run_bot(config) diff --git a/cli/main.py b/cli/main.py index 6253f07..e67b7dc 100644 --- a/cli/main.py +++ b/cli/main.py @@ -35,6 +35,7 @@ from cli.commands.skills import skills_app from cli.commands.journal import journal_app from cli.commands.keys import keys_app +from cli.commands.telegram_cmd import telegram_app app.command("run", help="Start autonomous trading with a strategy")(run_cmd) app.command("status", help="Show positions, PnL, and risk state")(status_cmd) @@ -53,6 +54,7 @@ app.add_typer(skills_app, name="skills", help="Skill discovery and registry") app.add_typer(journal_app, name="journal", help="Trade journal — structured position records with reasoning") app.add_typer(keys_app, name="keys", help="Unified key management across backends") +app.add_typer(telegram_app, name="telegram", help="Telegram bot — deploy agents from chat") def main(): diff --git a/pyproject.toml b/pyproject.toml index 0b55b07..ae08f9f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ dependencies = [ [project.optional-dependencies] llm = ["anthropic>=0.40.0"] mcp = ["mcp>=1.0.0"] +telegram = ["python-telegram-bot>=21.0"] dev = ["pytest>=7.0", "ruff>=0.4.0", "mypy>=1.8.0"] [project.scripts] @@ -55,4 +56,4 @@ disallow_untyped_defs = false ignore_missing_imports = true [tool.setuptools.packages.find] -include = ["cli*", "strategies*", "sdk*", "common*", "parent*", "modules*", "skills*", "quoting_engine*"] +include = ["cli*", "strategies*", "sdk*", "common*", "parent*", "modules*", "skills*", "quoting_engine*", "tg_bot*"] diff --git a/scripts/entrypoint.py b/scripts/entrypoint.py index 83839c5..c333346 100644 --- a/scripts/entrypoint.py +++ b/scripts/entrypoint.py @@ -306,8 +306,14 @@ def build_command() -> list[str]: elif mode == "mcp": return py + ["mcp", "serve", "--transport", "sse"] + elif mode == "telegram": + cmd = py + ["telegram", "start"] + if os.environ.get("HL_TESTNET", "true").lower() == "false": + cmd.append("--mainnet") + return cmd + else: - log.error("Unknown RUN_MODE: %s. Use apex, wolf, strategy, or mcp.", mode) + log.error("Unknown RUN_MODE: %s. Use apex, wolf, strategy, mcp, or telegram.", mode) sys.exit(1) diff --git a/tg_bot/__init__.py b/tg_bot/__init__.py new file mode 100644 index 0000000..28edfb8 --- /dev/null +++ b/tg_bot/__init__.py @@ -0,0 +1 @@ +"""Telegram bot interface for deploying and controlling on-chain trading agents.""" diff --git a/tg_bot/auth.py b/tg_bot/auth.py new file mode 100644 index 0000000..d6c6796 --- /dev/null +++ b/tg_bot/auth.py @@ -0,0 +1,64 @@ +"""Single-user authentication for Telegram bot.""" +from __future__ import annotations + +import logging +from functools import wraps +from typing import Callable + +from telegram import Update +from telegram.ext import ContextTypes + +log = logging.getLogger("telegram.auth") + +# File to persist auto-detected chat ID +_CHAT_ID_FILE = None + + +def _get_chat_id_file(): + from pathlib import Path + return Path.home() / ".hl-agent" / "telegram_chat_id" + + +def load_persisted_chat_id() -> int | None: + """Load previously persisted chat ID from disk.""" + path = _get_chat_id_file() + if path.exists(): + try: + return int(path.read_text().strip()) + except (ValueError, OSError): + pass + return None + + +def persist_chat_id(chat_id: int) -> None: + """Save chat ID to disk for persistence across restarts.""" + path = _get_chat_id_file() + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(str(chat_id)) + + +def authorized(func: Callable) -> Callable: + """Decorator that restricts handler to allowed chat IDs. + + On first interaction, if no chat IDs are configured, auto-registers the first user. + """ + @wraps(func) + async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE): + chat_id = update.effective_chat.id + allowed: list = context.bot_data.get("allowed_chat_ids", []) + + if not allowed: + # Auto-register first user + allowed.append(chat_id) + context.bot_data["allowed_chat_ids"] = allowed + persist_chat_id(chat_id) + log.info("Auto-registered chat ID %d as authorized user", chat_id) + + if chat_id not in allowed: + log.warning("Unauthorized access attempt from chat_id=%d", chat_id) + await update.message.reply_text("Unauthorized. This bot is private.") + return + + return await func(update, context) + + return wrapper diff --git a/tg_bot/bot.py b/tg_bot/bot.py new file mode 100644 index 0000000..fe46bab --- /dev/null +++ b/tg_bot/bot.py @@ -0,0 +1,87 @@ +"""Telegram bot application — entry point and handler registration.""" +from __future__ import annotations + +import asyncio +import logging + +from telegram.ext import Application + +from tg_bot.auth import load_persisted_chat_id +from tg_bot.config import TelegramBotConfig +from tg_bot.engine_bridge import EngineBridge +from tg_bot.handlers.apex import register_apex_handlers +from tg_bot.handlers.control import register_control_handlers +from tg_bot.handlers.start import build_start_handler +from tg_bot.handlers.strategy import build_deploy_handler +from tg_bot.notifier import Notifier + +log = logging.getLogger("telegram.bot") + + +async def post_init(application: Application) -> None: + """Called after bot is initialized but before polling starts.""" + config: TelegramBotConfig = application.bot_data["config"] + event_queue: asyncio.Queue = application.bot_data["event_queue"] + + # Auto-detect chat ID for notifications + chat_id = None + if config.allowed_chat_ids: + chat_id = config.allowed_chat_ids[0] + else: + persisted = load_persisted_chat_id() + if persisted: + config.allowed_chat_ids.append(persisted) + chat_id = persisted + + if chat_id: + notifier = Notifier( + bot=application.bot, + chat_id=chat_id, + event_queue=event_queue, + pnl_interval_s=config.notification_interval_s, + tick_summary_interval_s=config.tick_summary_interval_s, + ) + notifier.start() + application.bot_data["notifier"] = notifier + log.info("Notifier started for chat_id=%d", chat_id) + else: + log.info("No chat ID configured — notifier will start after /start") + + +def run_bot(config: TelegramBotConfig) -> None: + """Build and run the Telegram bot (blocking).""" + if not config.bot_token: + raise RuntimeError("TELEGRAM_BOT_TOKEN is required. Set it in your environment.") + + log.info("Starting Telegram bot (network=%s)", config.default_network) + + # Create event queue for engine -> bot communication + event_queue = asyncio.Queue() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # Build application + application = ( + Application.builder() + .token(config.bot_token) + .post_init(post_init) + .build() + ) + + # Store shared state + application.bot_data["config"] = config + application.bot_data["event_queue"] = event_queue + application.bot_data["allowed_chat_ids"] = list(config.allowed_chat_ids) + + # Create engine bridge + bridge = EngineBridge(event_queue=event_queue, loop=loop) + application.bot_data["engine_bridge"] = bridge + + # Register handlers (order matters — ConversationHandlers first) + application.add_handler(build_start_handler()) + application.add_handler(build_deploy_handler()) + register_control_handlers(application) + register_apex_handlers(application) + + log.info("Bot ready — polling for updates") + application.run_polling(drop_pending_updates=True) diff --git a/tg_bot/config.py b/tg_bot/config.py new file mode 100644 index 0000000..9ab596c --- /dev/null +++ b/tg_bot/config.py @@ -0,0 +1,39 @@ +"""Telegram bot configuration.""" +from __future__ import annotations + +import os +from dataclasses import dataclass, field +from typing import List, Optional + + +@dataclass +class TelegramBotConfig: + """Configuration for the Telegram trading bot.""" + + bot_token: str = "" + allowed_chat_ids: List[int] = field(default_factory=list) + default_network: str = "testnet" + mainnet_confirmation: bool = True + notification_interval_s: int = 60 + tick_summary_interval_s: int = 300 + max_concurrent_agents: int = 1 + + @classmethod + def from_env(cls) -> "TelegramBotConfig": + token = os.environ.get("TELEGRAM_BOT_TOKEN", "") + chat_ids_raw = os.environ.get("TELEGRAM_CHAT_ID", "") + chat_ids = [] + if chat_ids_raw: + chat_ids = [int(x.strip()) for x in chat_ids_raw.split(",") if x.strip()] + + network = "mainnet" if os.environ.get("HL_TESTNET", "true").lower() == "false" else "testnet" + + return cls( + bot_token=token, + allowed_chat_ids=chat_ids, + default_network=network, + ) + + @property + def is_mainnet(self) -> bool: + return self.default_network == "mainnet" diff --git a/tg_bot/engine_bridge.py b/tg_bot/engine_bridge.py new file mode 100644 index 0000000..fde7263 --- /dev/null +++ b/tg_bot/engine_bridge.py @@ -0,0 +1,312 @@ +"""Thread-safe bridge between Telegram bot and TradingEngine.""" +from __future__ import annotations + +import asyncio +import logging +import os +import signal +import threading +import time +from decimal import Decimal +from typing import Any, Dict, Optional + +from cli.engine import TradingEngine + +log = logging.getLogger("telegram.bridge") +ZERO = Decimal("0") + + +class NotifyingEngine(TradingEngine): + """TradingEngine subclass that pushes events to an asyncio queue.""" + + def __init__(self, event_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop, **kwargs): + super().__init__(**kwargs) + self._event_queue = event_queue + self._loop = loop + + def _push_event(self, event: Dict[str, Any]) -> None: + """Thread-safe push from engine thread to async queue.""" + try: + self._loop.call_soon_threadsafe(self._event_queue.put_nowait, event) + except Exception: + pass # Don't crash engine if notification fails + + def _log_tick(self, snapshot, decisions, fills, ok: bool) -> None: + """Override to also push tick data to Telegram.""" + super()._log_tick(snapshot, decisions, fills, ok) + + agent_id = self.strategy.strategy_id + pos = self.position_tracker.get_agent_position(agent_id, self.instrument) + mid_dec = Decimal(str(snapshot.mid_price)) + + # Push fills as individual events + for fill in fills: + self._push_event({ + "type": "fill", + "side": fill.side, + "quantity": str(fill.quantity), + "price": str(fill.price), + "instrument": fill.instrument, + "strategy": self.strategy.strategy_id, + "tick": self.tick_count, + }) + + # Push tick summary + self._push_event({ + "type": "tick", + "tick_count": self.tick_count, + "instrument": self.instrument, + "strategy": self.strategy.strategy_id, + "mid_price": snapshot.mid_price, + "pos_qty": float(pos.net_qty), + "avg_entry": float(pos.avg_entry_price), + "upnl": float(pos.unrealized_pnl(mid_dec)), + "rpnl": float(pos.realized_pnl), + "orders_sent": len(decisions), + "orders_filled": len(fills), + "risk_ok": ok, + "reduce_only": self.risk_manager.state.reduce_only, + "safe_mode": self.risk_manager.state.safe_mode, + }) + + def _handle_shutdown(self, signum, frame): + """Override to push shutdown event instead of setting signal handler.""" + log.info("Engine shutdown signal received") + self._running = False + + def _shutdown(self): + """Override to push shutdown summary.""" + super()._shutdown() + + agent_id = self.strategy.strategy_id + pos = self.position_tracker.get_agent_position(agent_id, self.instrument) + elapsed = (time.time() * 1000 - self.start_time_ms) / 1000 + + try: + snap = self.hl.get_snapshot(self.instrument) + mid = Decimal(str(snap.mid_price)) if snap.mid_price > 0 else pos.avg_entry_price + except Exception: + mid = pos.avg_entry_price + + stats = self.order_manager.stats + self._push_event({ + "type": "shutdown", + "tick_count": self.tick_count, + "total_placed": stats["total_placed"], + "total_filled": stats["total_filled"], + "total_pnl": float(pos.total_pnl(mid)), + "elapsed_s": elapsed, + }) + + +class EngineBridge: + """Manages TradingEngine lifecycle from async Telegram context.""" + + def __init__(self, event_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop): + self.event_queue = event_queue + self.loop = loop + self.engine: Optional[NotifyingEngine] = None + self.engine_thread: Optional[threading.Thread] = None + self._paused = False + + def is_running(self) -> bool: + return ( + self.engine is not None + and self.engine._running + and self.engine_thread is not None + and self.engine_thread.is_alive() + ) + + def start_agent( + self, + strategy_name: str, + instrument: str, + mainnet: bool = False, + risk_overrides: Optional[Dict[str, Any]] = None, + dry_run: bool = False, + mock: bool = False, + ) -> Dict[str, Any]: + """Start a trading agent in a background thread.""" + if self.is_running(): + raise RuntimeError("Agent is already running. Stop it first.") + + import sys + from pathlib import Path + + project_root = str(Path(__file__).resolve().parent.parent) + if project_root not in sys.path: + sys.path.insert(0, project_root) + + from cli.config import TradingConfig + from cli.strategy_registry import resolve_instrument, resolve_strategy_path + from sdk.strategy_sdk.loader import load_strategy + + # Build config + cfg = TradingConfig() + cfg.strategy = strategy_name + cfg.instrument = resolve_instrument(instrument) + cfg.mainnet = mainnet + cfg.dry_run = dry_run + + if risk_overrides: + for key, val in risk_overrides.items(): + if hasattr(cfg, key): + setattr(cfg, key, val) + + # Network guard + if mainnet: + env_testnet = os.environ.get("HL_TESTNET", "true").lower() + if env_testnet == "true": + raise RuntimeError( + "Cannot deploy on mainnet: HL_TESTNET=true in environment. " + "Set HL_TESTNET=false first." + ) + + # Resolve strategy + strategy_path = resolve_strategy_path(cfg.strategy) + strategy_cls = load_strategy(strategy_path) + strategy_instance = strategy_cls(strategy_id=cfg.strategy, **dict(cfg.strategy_params)) + + # Build HL adapter + if mock or dry_run: + from cli.hl_adapter import DirectMockProxy + hl = DirectMockProxy() + else: + from cli.hl_adapter import DirectHLProxy + from parent.hl_proxy import HLProxy + + private_key = cfg.get_private_key() + raw_hl = HLProxy(private_key=private_key, testnet=not cfg.mainnet) + hl = DirectHLProxy(raw_hl) + + # Builder fee + builder_cfg = cfg.get_builder_config() + builder_info = builder_cfg.to_builder_info() + + # Create engine + self.engine = NotifyingEngine( + event_queue=self.event_queue, + loop=self.loop, + hl=hl, + strategy=strategy_instance, + instrument=cfg.instrument, + tick_interval=cfg.tick_interval, + dry_run=cfg.dry_run, + data_dir=cfg.data_dir, + risk_limits=cfg.to_risk_limits(), + builder=builder_info, + ) + + # Start in background thread + self.engine_thread = threading.Thread( + target=self._run_engine, + name="trading-engine", + daemon=True, + ) + self.engine_thread.start() + self._paused = False + + log.info("Agent started: strategy=%s instrument=%s mainnet=%s", + strategy_name, instrument, mainnet) + return {"status": "started", "strategy": strategy_name, "instrument": instrument} + + def _run_engine(self) -> None: + """Engine thread entry point.""" + try: + self.engine.run(resume=True) + except Exception as e: + log.error("Engine crashed: %s", e, exc_info=True) + self.engine._push_event({ + "type": "error", + "message": f"Engine crashed: {e}", + }) + + def stop_agent(self) -> Dict[str, Any]: + """Stop the running agent. Returns shutdown summary.""" + if not self.engine: + return {"status": "not_running"} + + self.engine._running = False + + if self.engine_thread and self.engine_thread.is_alive(): + self.engine_thread.join(timeout=30) + + stats = self.engine.order_manager.stats if self.engine else {} + agent_id = self.engine.strategy.strategy_id if self.engine else "unknown" + elapsed = (time.time() * 1000 - self.engine.start_time_ms) / 1000 if self.engine else 0 + + # Get final PnL + total_pnl = 0.0 + if self.engine: + pos = self.engine.position_tracker.get_agent_position(agent_id, self.engine.instrument) + try: + snap = self.engine.hl.get_snapshot(self.engine.instrument) + mid = Decimal(str(snap.mid_price)) + except Exception: + mid = pos.avg_entry_price + total_pnl = float(pos.total_pnl(mid)) + + result = { + "status": "stopped", + "tick_count": self.engine.tick_count if self.engine else 0, + "total_placed": stats.get("total_placed", 0), + "total_filled": stats.get("total_filled", 0), + "total_pnl": total_pnl, + "elapsed_s": elapsed, + } + + self.engine = None + self.engine_thread = None + return result + + def pause_agent(self) -> None: + """Pause the engine (stops ticking but keeps state).""" + if self.engine: + self.engine._running = False + self._paused = True + + def resume_agent(self) -> None: + """Resume from paused state.""" + if not self.engine or not self._paused: + raise RuntimeError("No paused agent to resume") + + self.engine_thread = threading.Thread( + target=self._run_engine, + name="trading-engine", + daemon=True, + ) + self.engine._running = True + self.engine_thread.start() + self._paused = False + + def get_status(self) -> Dict[str, Any]: + """Get current engine status (thread-safe read of engine state).""" + if not self.engine: + return {"running": False} + + agent_id = self.engine.strategy.strategy_id + pos = self.engine.position_tracker.get_agent_position(agent_id, self.engine.instrument) + mid_dec = Decimal(str(1.0)) + + try: + snap = self.engine.hl.get_snapshot(self.engine.instrument) + mid_dec = Decimal(str(snap.mid_price)) + except Exception: + mid_dec = pos.avg_entry_price if pos.avg_entry_price > 0 else Decimal("1") + + elapsed = (time.time() * 1000 - self.engine.start_time_ms) / 1000 + + return { + "running": self.engine._running, + "strategy": agent_id, + "instrument": self.engine.instrument, + "tick_count": self.engine.tick_count, + "pos_qty": float(pos.net_qty), + "avg_entry": float(pos.avg_entry_price), + "upnl": float(pos.unrealized_pnl(mid_dec)), + "rpnl": float(pos.realized_pnl), + "elapsed_s": elapsed, + "risk_ok": self.engine.risk_manager.can_trade(), + "reduce_only": self.engine.risk_manager.state.reduce_only, + "safe_mode": self.engine.risk_manager.state.safe_mode, + } diff --git a/tg_bot/formatters.py b/tg_bot/formatters.py new file mode 100644 index 0000000..6f5759b --- /dev/null +++ b/tg_bot/formatters.py @@ -0,0 +1,282 @@ +"""Telegram message formatting — plain text cards and inline keyboards.""" +from __future__ import annotations + +import time +from typing import Any, Dict, List, Optional + +from telegram import InlineKeyboardButton, InlineKeyboardMarkup + + +def escape_md(text: str) -> str: + """Escape special characters for MarkdownV2.""" + special = r"_*[]()~`>#+-=|{}.!" + for ch in special: + text = text.replace(ch, f"\\{ch}") + return text + + +# ── Inline Keyboards ── + + +def wallet_keyboard() -> InlineKeyboardMarkup: + return InlineKeyboardMarkup([ + [ + InlineKeyboardButton("Create New Wallet", callback_data="wallet_create"), + InlineKeyboardButton("Import Private Key", callback_data="wallet_import"), + ], + ]) + + +def strategy_keyboard(strategies: Dict[str, Dict[str, Any]], page: int = 0, per_page: int = 8) -> InlineKeyboardMarkup: + """Paginated strategy selection keyboard.""" + names = sorted(strategies.keys()) + total_pages = (len(names) + per_page - 1) // per_page + start = page * per_page + page_items = names[start:start + per_page] + + rows = [] + for i in range(0, len(page_items), 2): + row = [InlineKeyboardButton(name, callback_data=f"strat_{name}") for name in page_items[i:i + 2]] + rows.append(row) + + # Pagination buttons + nav = [] + if page > 0: + nav.append(InlineKeyboardButton("<< Prev", callback_data=f"strat_page_{page - 1}")) + if page < total_pages - 1: + nav.append(InlineKeyboardButton("Next >>", callback_data=f"strat_page_{page + 1}")) + if nav: + rows.append(nav) + + return InlineKeyboardMarkup(rows) + + +def instrument_keyboard() -> InlineKeyboardMarkup: + """Common instruments + YEX markets.""" + return InlineKeyboardMarkup([ + [ + InlineKeyboardButton("ETH-PERP", callback_data="inst_ETH-PERP"), + InlineKeyboardButton("BTC-PERP", callback_data="inst_BTC-PERP"), + ], + [ + InlineKeyboardButton("SOL-PERP", callback_data="inst_SOL-PERP"), + InlineKeyboardButton("HYPE-PERP", callback_data="inst_HYPE-PERP"), + ], + [ + InlineKeyboardButton("VXX-USDYP", callback_data="inst_VXX-USDYP"), + InlineKeyboardButton("US3M-USDYP", callback_data="inst_US3M-USDYP"), + ], + [ + InlineKeyboardButton("BTCSWP-USDYP", callback_data="inst_BTCSWP-USDYP"), + ], + ]) + + +def preset_keyboard() -> InlineKeyboardMarkup: + """Risk preset selection.""" + return InlineKeyboardMarkup([ + [ + InlineKeyboardButton("Conservative", callback_data="preset_conservative"), + InlineKeyboardButton("Default", callback_data="preset_default"), + InlineKeyboardButton("Aggressive", callback_data="preset_aggressive"), + ], + ]) + + +def confirm_keyboard(mainnet: bool = False) -> InlineKeyboardMarkup: + """Deployment confirmation.""" + rows = [ + [ + InlineKeyboardButton("Deploy Agent", callback_data="confirm_deploy"), + InlineKeyboardButton("Cancel", callback_data="confirm_cancel"), + ], + ] + return InlineKeyboardMarkup(rows) + + +def mainnet_confirm_keyboard() -> InlineKeyboardMarkup: + """Double confirmation for mainnet.""" + return InlineKeyboardMarkup([ + [ + InlineKeyboardButton( + "YES - Deploy on MAINNET with REAL funds", + callback_data="mainnet_confirm_yes", + ), + ], + [ + InlineKeyboardButton("Cancel", callback_data="mainnet_confirm_no"), + ], + ]) + + +def control_keyboard() -> InlineKeyboardMarkup: + """Agent control buttons.""" + return InlineKeyboardMarkup([ + [ + InlineKeyboardButton("Pause", callback_data="ctrl_pause"), + InlineKeyboardButton("Resume", callback_data="ctrl_resume"), + InlineKeyboardButton("Stop", callback_data="ctrl_stop"), + ], + [ + InlineKeyboardButton("Status", callback_data="ctrl_status"), + InlineKeyboardButton("Balance", callback_data="ctrl_balance"), + ], + ]) + + +# ── Message Cards ── + + +def welcome_card(has_wallet: bool, address: str = "", balance: float = 0.0) -> str: + if has_wallet: + return ( + "Nunchi Trading Agent\n" + "━━━━━━━━━━━━━━━━━━━━\n" + f"Wallet: {address[:8]}...{address[-6:]}\n" + f"Balance: ${balance:.2f}\n\n" + "Commands:\n" + "/deploy - Deploy a trading agent\n" + "/status - Check agent status\n" + "/balance - Account balance\n" + "/stop - Stop running agent\n" + "/help - All commands" + ) + return ( + "Nunchi Trading Agent\n" + "━━━━━━━━━━━━━━━━━━━━\n" + "Deploy autonomous trading agents on Hyperliquid\n" + "directly from Telegram.\n\n" + "First, let's set up your wallet." + ) + + +def wallet_created_card(address: str, network: str) -> str: + return ( + "Wallet Created\n" + "━━━━━━━━━━━━━━━━━━━━\n" + f"Address: {address}\n" + f"Network: {network}\n\n" + f"{'Claim testnet USDyP: /claim' if network == 'testnet' else 'Deposit USDC via Hyperliquid web UI'}\n\n" + "Your key is encrypted and stored locally.\n" + "Use /deploy to start a trading agent." + ) + + +def strategy_info_card(name: str, info: Dict[str, Any]) -> str: + params = "\n".join(f" {k}: {v}" for k, v in info.get("params", {}).items()) + return ( + f"Strategy: {name}\n" + f"━━━━━━━━━━━━━━━━━━━━\n" + f"{info['description']}\n\n" + f"Default Parameters:\n{params}" + ) + + +def deploy_confirm_card( + strategy: str, + instrument: str, + preset: str, + network: str, + risk_params: Dict[str, Any], +) -> str: + return ( + "Deploy Confirmation\n" + "━━━━━━━━━━━━━━━━━━━━\n" + f"Strategy: {strategy}\n" + f"Instrument: {instrument}\n" + f"Preset: {preset}\n" + f"Network: {network.upper()}\n\n" + f"Risk Limits:\n" + f" Max Position: {risk_params.get('max_position_qty', 'default')}\n" + f" Max Notional: ${risk_params.get('max_notional_usd', 'default')}\n" + f" Max Leverage: {risk_params.get('max_leverage', 'default')}x\n" + ) + + +def fill_card( + side: str, + quantity: str, + price: str, + instrument: str, + strategy: str, + tick: int, +) -> str: + direction = "BUY" if side == "buy" else "SELL" + return ( + f"Fill: {direction} {quantity} {instrument} @ ${price}\n" + f"Strategy: {strategy} | Tick: {tick}" + ) + + +def status_card( + strategy: str, + instrument: str, + network: str, + tick_count: int, + pos_qty: float, + avg_entry: float, + upnl: float, + rpnl: float, + elapsed_s: float, + risk_ok: bool, +) -> str: + total_pnl = upnl + rpnl + sign = lambda v: f"+{v:.2f}" if v >= 0 else f"{v:.2f}" + elapsed_min = int(elapsed_s // 60) + + return ( + "Agent Status\n" + "━━━━━━━━━━━━━━━━━━━━\n" + f"Strategy: {strategy}\n" + f"Instrument: {instrument}\n" + f"Network: {network}\n" + f"Ticks: {tick_count} ({elapsed_min}min)\n\n" + f"Position: {sign(pos_qty)} @ ${avg_entry:.4f}\n" + f"PnL: uPnL ${sign(upnl)} | rPnL ${sign(rpnl)} | Total ${sign(total_pnl)}\n" + f"Risk: {'OK' if risk_ok else 'BLOCKED'}" + ) + + +def shutdown_card( + tick_count: int, + total_placed: int, + total_filled: int, + total_pnl: float, + elapsed_s: float, +) -> str: + sign = lambda v: f"+{v:.2f}" if v >= 0 else f"{v:.2f}" + return ( + "Agent Stopped\n" + "━━━━━━━━━━━━━━━━━━━━\n" + f"Ticks: {tick_count}\n" + f"Orders: {total_placed} placed, {total_filled} filled\n" + f"PnL: ${sign(total_pnl)}\n" + f"Runtime: {int(elapsed_s)}s" + ) + + +def balance_card(address: str, balance: float, network: str) -> str: + return ( + "Account Balance\n" + "━━━━━━━━━━━━━━━━━━━━\n" + f"Address: {address[:8]}...{address[-6:]}\n" + f"Balance: ${balance:.2f}\n" + f"Network: {network}" + ) + + +def help_card() -> str: + return ( + "Nunchi Bot Commands\n" + "━━━━━━━━━━━━━━━━━━━━\n" + "/start - Setup wallet\n" + "/deploy - Deploy trading agent\n" + "/status - Agent status + PnL\n" + "/balance - Account balance\n" + "/pause - Pause agent\n" + "/resume - Resume agent\n" + "/stop - Stop agent\n" + "/switch - Change strategy\n" + "/apex - APEX multi-strategy mode\n" + "/help - This message" + ) diff --git a/tg_bot/handlers/__init__.py b/tg_bot/handlers/__init__.py new file mode 100644 index 0000000..28ec799 --- /dev/null +++ b/tg_bot/handlers/__init__.py @@ -0,0 +1 @@ +"""Telegram bot command handlers.""" diff --git a/tg_bot/handlers/apex.py b/tg_bot/handlers/apex.py new file mode 100644 index 0000000..3d9803e --- /dev/null +++ b/tg_bot/handlers/apex.py @@ -0,0 +1,118 @@ +"""APEX multi-strategy orchestration mode via Telegram.""" +from __future__ import annotations + +import logging +import subprocess +import sys +from pathlib import Path + +from telegram import Update +from telegram.ext import CommandHandler, ContextTypes + +from tg_bot.auth import authorized + +log = logging.getLogger("tg_bot.apex") + +# Track the APEX subprocess +_apex_proc: subprocess.Popen | None = None + + +@authorized +async def apex_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Start APEX multi-slot orchestration.""" + global _apex_proc + + if _apex_proc and _apex_proc.poll() is None: + await update.message.reply_text( + "APEX is already running.\n" + "Use /apex_stop to stop it, or /apex_status to check." + ) + return + + config = context.bot_data.get("config") + network = config.default_network if config else "testnet" + + project_root = str(Path(__file__).resolve().parent.parent.parent) + cmd = [sys.executable, "-m", "cli.main", "apex", "run"] + if network == "mainnet": + cmd.append("--mainnet") + + try: + _apex_proc = subprocess.Popen( + cmd, + cwd=project_root, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + ) + await update.message.reply_text( + f"APEX started (pid={_apex_proc.pid})\n" + f"Network: {network}\n\n" + "Use /apex_status to check, /apex_stop to stop." + ) + log.info("APEX started (pid=%d, network=%s)", _apex_proc.pid, network) + except Exception as e: + log.error("Failed to start APEX: %s", e) + await update.message.reply_text(f"Failed to start APEX: {e}") + + +@authorized +async def apex_status_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Check APEX status.""" + global _apex_proc + + if not _apex_proc: + await update.message.reply_text("APEX is not running. Use /apex to start.") + return + + rc = _apex_proc.poll() + if rc is not None: + await update.message.reply_text(f"APEX exited with code {rc}. Use /apex to restart.") + _apex_proc = None + return + + # Try to get status from CLI + project_root = str(Path(__file__).resolve().parent.parent.parent) + try: + result = subprocess.run( + [sys.executable, "-m", "cli.main", "apex", "status"], + cwd=project_root, + capture_output=True, + text=True, + timeout=10, + ) + # Strip ANSI codes for Telegram + import re + clean = re.sub(r'\033\[[0-9;]*m', '', result.stdout) + await update.message.reply_text(clean[:4000] if clean else "APEX running (no status output)") + except Exception as e: + await update.message.reply_text(f"APEX running (pid={_apex_proc.pid}), status check failed: {e}") + + +@authorized +async def apex_stop_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Stop APEX.""" + global _apex_proc + + if not _apex_proc or _apex_proc.poll() is not None: + await update.message.reply_text("APEX is not running.") + _apex_proc = None + return + + import signal + _apex_proc.send_signal(signal.SIGTERM) + try: + _apex_proc.wait(timeout=15) + await update.message.reply_text("APEX stopped.") + except subprocess.TimeoutExpired: + _apex_proc.kill() + await update.message.reply_text("APEX killed (did not stop gracefully).") + + _apex_proc = None + + +def register_apex_handlers(app) -> None: + """Register APEX command handlers.""" + app.add_handler(CommandHandler("apex", apex_cmd)) + app.add_handler(CommandHandler("apex_status", apex_status_cmd)) + app.add_handler(CommandHandler("apex_stop", apex_stop_cmd)) diff --git a/tg_bot/handlers/control.py b/tg_bot/handlers/control.py new file mode 100644 index 0000000..48ca31e --- /dev/null +++ b/tg_bot/handlers/control.py @@ -0,0 +1,218 @@ +"""Agent control commands — status, pause, resume, stop, balance.""" +from __future__ import annotations + +import logging +import time + +from telegram import Update +from telegram.ext import CallbackQueryHandler, CommandHandler, ContextTypes + +from tg_bot.auth import authorized +from tg_bot.formatters import balance_card, control_keyboard, help_card, status_card, shutdown_card + +log = logging.getLogger("telegram.control") + + +def _get_bridge(context): + return context.bot_data.get("engine_bridge") + + +@authorized +async def status_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Show current agent status.""" + bridge = _get_bridge(context) + if not bridge or not bridge.is_running(): + await update.message.reply_text("No agent is running. Use /deploy to start one.") + return + + info = bridge.get_status() + config = context.bot_data.get("config") + network = config.default_network if config else "testnet" + + text = status_card( + strategy=info["strategy"], + instrument=info["instrument"], + network=network, + tick_count=info["tick_count"], + pos_qty=info["pos_qty"], + avg_entry=info["avg_entry"], + upnl=info["upnl"], + rpnl=info["rpnl"], + elapsed_s=info["elapsed_s"], + risk_ok=info["risk_ok"], + ) + await update.message.reply_text(text, reply_markup=control_keyboard()) + + +@authorized +async def pause_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Pause the running agent.""" + bridge = _get_bridge(context) + if not bridge or not bridge.is_running(): + await update.message.reply_text("No agent is running.") + return + + bridge.pause_agent() + await update.message.reply_text("Agent paused. Use /resume to continue.") + + +@authorized +async def resume_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Resume a paused agent.""" + bridge = _get_bridge(context) + if not bridge: + await update.message.reply_text("No agent to resume. Use /deploy first.") + return + + if bridge.is_running(): + await update.message.reply_text("Agent is already running.") + return + + try: + bridge.resume_agent() + await update.message.reply_text("Agent resumed.") + except Exception as e: + await update.message.reply_text(f"Failed to resume: {e}") + + +@authorized +async def stop_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Stop the running agent.""" + bridge = _get_bridge(context) + if not bridge or not bridge.is_running(): + await update.message.reply_text("No agent is running.") + return + + summary = bridge.stop_agent() + text = shutdown_card( + tick_count=summary.get("tick_count", 0), + total_placed=summary.get("total_placed", 0), + total_filled=summary.get("total_filled", 0), + total_pnl=summary.get("total_pnl", 0.0), + elapsed_s=summary.get("elapsed_s", 0.0), + ) + await update.message.reply_text(text) + + +@authorized +async def balance_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Show account balance.""" + from cli.keystore import list_keystores + + keystores = list_keystores() + if not keystores: + await update.message.reply_text("No wallet found. Use /start first.") + return + + address = keystores[0]["address"] + config = context.bot_data.get("config") + network = config.default_network if config else "testnet" + + try: + from common.credentials import resolve_private_key + from parent.hl_proxy import HLProxy + from cli.hl_adapter import DirectHLProxy + + private_key = resolve_private_key(venue="hl") + raw_hl = HLProxy(private_key=private_key, testnet=(network != "mainnet")) + hl = DirectHLProxy(raw_hl) + account = hl.get_account_state() + + bal = 0.0 + if "crossMarginSummary" in account: + bal = float(account["crossMarginSummary"].get("accountValue", 0)) + elif "marginSummary" in account: + bal = float(account["marginSummary"].get("accountValue", 0)) + + await update.message.reply_text(balance_card(address, bal, network)) + except Exception as e: + log.error("Balance check failed: %s", e) + await update.message.reply_text(f"Could not fetch balance: {e}") + + +@authorized +async def help_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Show help.""" + await update.message.reply_text(help_card()) + + +async def control_button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Handle inline control button presses.""" + query = update.callback_query + await query.answer() + + action = query.data.replace("ctrl_", "") + + if action == "status": + bridge = _get_bridge(context) + if not bridge or not bridge.is_running(): + await query.edit_message_text("No agent is running.") + return + info = bridge.get_status() + config = context.bot_data.get("config") + network = config.default_network if config else "testnet" + text = status_card( + strategy=info["strategy"], + instrument=info["instrument"], + network=network, + tick_count=info["tick_count"], + pos_qty=info["pos_qty"], + avg_entry=info["avg_entry"], + upnl=info["upnl"], + rpnl=info["rpnl"], + elapsed_s=info["elapsed_s"], + risk_ok=info["risk_ok"], + ) + await query.edit_message_text(text, reply_markup=control_keyboard()) + + elif action == "pause": + bridge = _get_bridge(context) + if bridge and bridge.is_running(): + bridge.pause_agent() + await query.edit_message_text("Agent paused. Use /resume to continue.") + else: + await query.edit_message_text("No agent is running.") + + elif action == "resume": + bridge = _get_bridge(context) + if bridge: + bridge.resume_agent() + await query.edit_message_text("Agent resumed.", reply_markup=control_keyboard()) + else: + await query.edit_message_text("No agent to resume.") + + elif action == "stop": + bridge = _get_bridge(context) + if bridge and bridge.is_running(): + summary = bridge.stop_agent() + text = shutdown_card( + tick_count=summary.get("tick_count", 0), + total_placed=summary.get("total_placed", 0), + total_filled=summary.get("total_filled", 0), + total_pnl=summary.get("total_pnl", 0.0), + elapsed_s=summary.get("elapsed_s", 0.0), + ) + await query.edit_message_text(text) + else: + await query.edit_message_text("No agent is running.") + + elif action == "balance": + from cli.keystore import list_keystores + keystores = list_keystores() + if not keystores: + await query.edit_message_text("No wallet found.") + return + # Simplified — just show address + address = keystores[0]["address"] + await query.edit_message_text(f"Wallet: {address}\nUse /balance for full details.") + + +def register_control_handlers(app) -> None: + """Register all control command handlers.""" + app.add_handler(CommandHandler("status", status_cmd)) + app.add_handler(CommandHandler("pause", pause_cmd)) + app.add_handler(CommandHandler("resume", resume_cmd)) + app.add_handler(CommandHandler("stop", stop_cmd)) + app.add_handler(CommandHandler("balance", balance_cmd)) + app.add_handler(CommandHandler("help", help_cmd)) + app.add_handler(CallbackQueryHandler(control_button_callback, pattern=r"^ctrl_")) diff --git a/tg_bot/handlers/start.py b/tg_bot/handlers/start.py new file mode 100644 index 0000000..b4bbfd0 --- /dev/null +++ b/tg_bot/handlers/start.py @@ -0,0 +1,194 @@ +"""Wallet creation and onboarding flow.""" +from __future__ import annotations + +import logging +import os +import secrets + +from telegram import Update +from telegram.ext import ( + CallbackQueryHandler, + CommandHandler, + ContextTypes, + ConversationHandler, + MessageHandler, + filters, +) + +from tg_bot.auth import authorized +from tg_bot.formatters import ( + wallet_created_card, + wallet_keyboard, + welcome_card, +) + +log = logging.getLogger("telegram.start") + +# Conversation states +CHOOSE_ACTION, IMPORT_KEY = range(2) + + +@authorized +async def start_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Entry point: /start — check wallet, show welcome.""" + from cli.keystore import list_keystores + + keystores = list_keystores() + config = context.bot_data.get("config") + network = config.default_network if config else "testnet" + + if keystores: + address = keystores[0]["address"] + # Try to get balance + balance = 0.0 + try: + balance = await _get_balance(address, network) + except Exception: + pass + await update.message.reply_text(welcome_card(True, address, balance)) + return ConversationHandler.END + + await update.message.reply_text( + welcome_card(False), + reply_markup=wallet_keyboard(), + ) + return CHOOSE_ACTION + + +async def wallet_create_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Create a new encrypted wallet.""" + query = update.callback_query + await query.answer() + + from cli.keystore import create_keystore, ENV_FILE + from eth_account import Account + + # Generate random key + password + account = Account.create() + private_key = account.key.hex() + if not private_key.startswith("0x"): + private_key = "0x" + private_key + password = secrets.token_urlsafe(24) + + # Save to keystore + ks_path = create_keystore(private_key, password) + + # Persist password for auto-unlock + ENV_FILE.parent.mkdir(parents=True, exist_ok=True) + lines = [] + if ENV_FILE.exists(): + lines = ENV_FILE.read_text().splitlines() + lines = [l for l in lines if not l.startswith("HL_KEYSTORE_PASSWORD=")] + lines.append(f"HL_KEYSTORE_PASSWORD={password}") + ENV_FILE.write_text("\n".join(lines) + "\n") + + address = account.address + config = context.bot_data.get("config") + network = config.default_network if config else "testnet" + + log.info("Created wallet %s (keystore: %s)", address, ks_path) + await query.edit_message_text(wallet_created_card(address, network)) + return ConversationHandler.END + + +async def wallet_import_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Prompt user to send private key.""" + query = update.callback_query + await query.answer() + await query.edit_message_text( + "Send your private key (hex format with 0x prefix).\n" + "The message will be deleted immediately for security." + ) + return IMPORT_KEY + + +async def receive_private_key(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Receive and encrypt a private key. Delete the user's message immediately.""" + import secrets + from cli.keystore import create_keystore, ENV_FILE + + # Delete the message containing the private key immediately + try: + await update.message.delete() + except Exception: + log.warning("Could not delete message containing private key") + + private_key = update.message.text.strip() + if not private_key.startswith("0x"): + private_key = "0x" + private_key + + # Validate + try: + from eth_account import Account + account = Account.from_key(private_key) + except Exception: + await update.message.reply_text( + "Invalid private key. Must be a 64-character hex string (with or without 0x prefix).\n" + "Try again or use /start to create a new wallet." + ) + return ConversationHandler.END + + password = secrets.token_urlsafe(24) + ks_path = create_keystore(private_key, password) + + # Persist password + ENV_FILE.parent.mkdir(parents=True, exist_ok=True) + lines = [] + if ENV_FILE.exists(): + lines = ENV_FILE.read_text().splitlines() + lines = [l for l in lines if not l.startswith("HL_KEYSTORE_PASSWORD=")] + lines.append(f"HL_KEYSTORE_PASSWORD={password}") + ENV_FILE.write_text("\n".join(lines) + "\n") + + config = context.bot_data.get("config") + network = config.default_network if config else "testnet" + + log.info("Imported wallet %s (keystore: %s)", account.address, ks_path) + await context.bot.send_message( + chat_id=update.effective_chat.id, + text=wallet_created_card(account.address, network), + ) + return ConversationHandler.END + + +async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + await update.message.reply_text("Cancelled.") + return ConversationHandler.END + + +async def _get_balance(address: str, network: str) -> float: + """Get account balance from HL. Returns 0 on failure.""" + try: + from common.credentials import resolve_private_key + from parent.hl_proxy import HLProxy + from cli.hl_adapter import DirectHLProxy + + private_key = resolve_private_key(venue="hl") + testnet = network != "mainnet" + raw_hl = HLProxy(private_key=private_key, testnet=testnet) + hl = DirectHLProxy(raw_hl) + account = hl.get_account_state() + if "crossMarginSummary" in account: + return float(account["crossMarginSummary"].get("accountValue", 0)) + if "marginSummary" in account: + return float(account["marginSummary"].get("accountValue", 0)) + except Exception as e: + log.debug("Balance check failed: %s", e) + return 0.0 + + +def build_start_handler() -> ConversationHandler: + """Build the /start conversation handler.""" + return ConversationHandler( + entry_points=[CommandHandler("start", start_cmd)], + states={ + CHOOSE_ACTION: [ + CallbackQueryHandler(wallet_create_callback, pattern="^wallet_create$"), + CallbackQueryHandler(wallet_import_callback, pattern="^wallet_import$"), + ], + IMPORT_KEY: [ + MessageHandler(filters.TEXT & ~filters.COMMAND, receive_private_key), + ], + }, + fallbacks=[CommandHandler("cancel", cancel)], + ) diff --git a/tg_bot/handlers/strategy.py b/tg_bot/handlers/strategy.py new file mode 100644 index 0000000..1691dd9 --- /dev/null +++ b/tg_bot/handlers/strategy.py @@ -0,0 +1,267 @@ +"""Strategy selection and agent deployment flow.""" +from __future__ import annotations + +import logging + +from telegram import Update +from telegram.ext import ( + CallbackQueryHandler, + CommandHandler, + ContextTypes, + ConversationHandler, +) + +from tg_bot.auth import authorized +from tg_bot.formatters import ( + confirm_keyboard, + deploy_confirm_card, + instrument_keyboard, + mainnet_confirm_keyboard, + preset_keyboard, + strategy_info_card, + strategy_keyboard, +) + +log = logging.getLogger("telegram.strategy") + +# Conversation states +CHOOSE_STRATEGY, CHOOSE_INSTRUMENT, CHOOSE_PRESET, CONFIRM, MAINNET_CONFIRM = range(5) + +# Risk presets +PRESETS = { + "conservative": { + "max_position_qty": 2.0, + "max_notional_usd": 5000.0, + "max_order_size": 1.0, + "max_leverage": 2.0, + "tvl": 10000.0, + }, + "default": { + "max_position_qty": 10.0, + "max_notional_usd": 25000.0, + "max_order_size": 5.0, + "max_leverage": 3.0, + "tvl": 100000.0, + }, + "aggressive": { + "max_position_qty": 25.0, + "max_notional_usd": 100000.0, + "max_order_size": 10.0, + "max_leverage": 5.0, + "tvl": 250000.0, + }, +} + + +def _get_registry(): + from cli.strategy_registry import STRATEGY_REGISTRY + return STRATEGY_REGISTRY + + +@authorized +async def deploy_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Entry point: /deploy — select strategy.""" + # Check if agent is already running + bridge = context.bot_data.get("engine_bridge") + if bridge and bridge.is_running(): + await update.message.reply_text( + "An agent is already running. Use /stop first, then /deploy again." + ) + return ConversationHandler.END + + # Check wallet exists + from cli.keystore import list_keystores + if not list_keystores(): + await update.message.reply_text("No wallet found. Use /start first to create one.") + return ConversationHandler.END + + registry = _get_registry() + await update.message.reply_text( + "Choose a strategy:", + reply_markup=strategy_keyboard(registry, page=0), + ) + return CHOOSE_STRATEGY + + +async def strategy_page_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Handle strategy pagination.""" + query = update.callback_query + await query.answer() + page = int(query.data.split("_")[-1]) + registry = _get_registry() + await query.edit_message_reply_markup( + reply_markup=strategy_keyboard(registry, page=page), + ) + return CHOOSE_STRATEGY + + +async def strategy_select_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Handle strategy selection.""" + query = update.callback_query + await query.answer() + strategy_name = query.data.replace("strat_", "") + + registry = _get_registry() + if strategy_name not in registry: + await query.edit_message_text(f"Unknown strategy: {strategy_name}") + return ConversationHandler.END + + context.user_data["deploy_strategy"] = strategy_name + + # Show strategy info + instrument picker + info = registry[strategy_name] + text = strategy_info_card(strategy_name, info) + "\n\nChoose instrument:" + await query.edit_message_text(text, reply_markup=instrument_keyboard()) + return CHOOSE_INSTRUMENT + + +async def instrument_select_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Handle instrument selection.""" + query = update.callback_query + await query.answer() + instrument = query.data.replace("inst_", "") + context.user_data["deploy_instrument"] = instrument + + await query.edit_message_text( + f"Strategy: {context.user_data['deploy_strategy']}\n" + f"Instrument: {instrument}\n\n" + "Choose risk preset:", + reply_markup=preset_keyboard(), + ) + return CHOOSE_PRESET + + +async def preset_select_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Handle preset selection, show confirmation.""" + query = update.callback_query + await query.answer() + preset_name = query.data.replace("preset_", "") + context.user_data["deploy_preset"] = preset_name + risk_params = PRESETS.get(preset_name, PRESETS["default"]) + context.user_data["deploy_risk_params"] = risk_params + + config = context.bot_data.get("config") + network = config.default_network if config else "testnet" + + text = deploy_confirm_card( + strategy=context.user_data["deploy_strategy"], + instrument=context.user_data["deploy_instrument"], + preset=preset_name, + network=network, + risk_params=risk_params, + ) + await query.edit_message_text(text, reply_markup=confirm_keyboard(mainnet=network == "mainnet")) + return CONFIRM + + +async def confirm_deploy_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Confirm deployment — or gate to mainnet double-confirm.""" + query = update.callback_query + await query.answer() + + config = context.bot_data.get("config") + network = config.default_network if config else "testnet" + + if network == "mainnet" and config and config.mainnet_confirmation: + await query.edit_message_text( + "WARNING: You are about to deploy on MAINNET with REAL funds.\n" + "Are you absolutely sure?", + reply_markup=mainnet_confirm_keyboard(), + ) + return MAINNET_CONFIRM + + return await _do_deploy(query, context) + + +async def mainnet_confirm_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Mainnet double confirmation.""" + query = update.callback_query + await query.answer() + + if query.data == "mainnet_confirm_yes": + return await _do_deploy(query, context) + else: + await query.edit_message_text("Deployment cancelled.") + return ConversationHandler.END + + +async def confirm_cancel_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + """Cancel deployment.""" + query = update.callback_query + await query.answer() + await query.edit_message_text("Deployment cancelled.") + return ConversationHandler.END + + +async def _do_deploy(query, context: ContextTypes.DEFAULT_TYPE) -> int: + """Actually start the trading engine.""" + strategy = context.user_data["deploy_strategy"] + instrument = context.user_data["deploy_instrument"] + preset = context.user_data["deploy_preset"] + risk_params = context.user_data["deploy_risk_params"] + + config = context.bot_data.get("config") + network = config.default_network if config else "testnet" + + bridge = context.bot_data.get("engine_bridge") + if not bridge: + await query.edit_message_text("Engine bridge not initialized. Contact admin.") + return ConversationHandler.END + + try: + await query.edit_message_text(f"Deploying {strategy} on {instrument}...") + + result = bridge.start_agent( + strategy_name=strategy, + instrument=instrument, + mainnet=(network == "mainnet"), + risk_overrides=risk_params, + ) + + from tg_bot.formatters import control_keyboard + await query.edit_message_text( + f"Agent deployed!\n\n" + f"Strategy: {strategy}\n" + f"Instrument: {instrument}\n" + f"Network: {network}\n" + f"Preset: {preset}\n\n" + f"Use the buttons below or /status to monitor.", + reply_markup=control_keyboard(), + ) + except Exception as e: + log.error("Failed to deploy agent: %s", e, exc_info=True) + await query.edit_message_text(f"Deployment failed: {e}") + + return ConversationHandler.END + + +async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + await update.message.reply_text("Deployment cancelled.") + return ConversationHandler.END + + +def build_deploy_handler() -> ConversationHandler: + """Build the /deploy conversation handler.""" + return ConversationHandler( + entry_points=[CommandHandler("deploy", deploy_cmd)], + states={ + CHOOSE_STRATEGY: [ + CallbackQueryHandler(strategy_page_callback, pattern=r"^strat_page_\d+$"), + CallbackQueryHandler(strategy_select_callback, pattern=r"^strat_(?!page_)\w+$"), + ], + CHOOSE_INSTRUMENT: [ + CallbackQueryHandler(instrument_select_callback, pattern=r"^inst_"), + ], + CHOOSE_PRESET: [ + CallbackQueryHandler(preset_select_callback, pattern=r"^preset_"), + ], + CONFIRM: [ + CallbackQueryHandler(confirm_deploy_callback, pattern="^confirm_deploy$"), + CallbackQueryHandler(confirm_cancel_callback, pattern="^confirm_cancel$"), + ], + MAINNET_CONFIRM: [ + CallbackQueryHandler(mainnet_confirm_callback, pattern=r"^mainnet_confirm_"), + ], + }, + fallbacks=[CommandHandler("cancel", cancel)], + ) diff --git a/tg_bot/notifier.py b/tg_bot/notifier.py new file mode 100644 index 0000000..342f272 --- /dev/null +++ b/tg_bot/notifier.py @@ -0,0 +1,134 @@ +"""Event notification system — pushes engine events to Telegram with throttling.""" +from __future__ import annotations + +import asyncio +import logging +import time +from typing import Any, Dict, Optional + +from telegram import Bot + +from tg_bot.formatters import fill_card, shutdown_card, status_card + +log = logging.getLogger("telegram.notifier") + + +class Notifier: + """Reads events from engine queue and sends Telegram messages with throttling.""" + + def __init__( + self, + bot: Bot, + chat_id: int, + event_queue: asyncio.Queue, + pnl_interval_s: int = 60, + tick_summary_interval_s: int = 300, + ): + self.bot = bot + self.chat_id = chat_id + self.event_queue = event_queue + self.pnl_interval_s = pnl_interval_s + self.tick_summary_interval_s = tick_summary_interval_s + self._last_pnl_sent = 0.0 + self._last_tick_summary = 0.0 + self._task: Optional[asyncio.Task] = None + + def start(self) -> None: + """Start the notifier as a background task.""" + self._task = asyncio.create_task(self._run()) + log.info("Notifier started (pnl_interval=%ds, tick_summary=%ds)", + self.pnl_interval_s, self.tick_summary_interval_s) + + def stop(self) -> None: + """Stop the notifier.""" + if self._task: + self._task.cancel() + self._task = None + + async def _run(self) -> None: + """Main loop: read events from queue, format, send.""" + while True: + try: + event = await self.event_queue.get() + await self._handle_event(event) + except asyncio.CancelledError: + break + except Exception as e: + log.error("Notifier error: %s", e, exc_info=True) + + async def _handle_event(self, event: Dict[str, Any]) -> None: + event_type = event.get("type") + + if event_type == "fill": + await self._send_fill(event) + elif event_type == "tick": + await self._maybe_send_tick_summary(event) + elif event_type == "shutdown": + await self._send_shutdown(event) + elif event_type == "error": + await self._send_error(event) + elif event_type == "risk_alert": + await self._send_risk_alert(event) + + async def _send_fill(self, event: Dict[str, Any]) -> None: + """Always send fill notifications.""" + text = fill_card( + side=event["side"], + quantity=event["quantity"], + price=event["price"], + instrument=event["instrument"], + strategy=event["strategy"], + tick=event["tick"], + ) + await self._send(text) + + async def _maybe_send_tick_summary(self, event: Dict[str, Any]) -> None: + """Send PnL updates at throttled intervals.""" + now = time.time() + + # Always-send conditions: safe mode or reduce-only transitions + if event.get("safe_mode") or event.get("reduce_only"): + await self._send( + f"RISK ALERT: {'Safe mode' if event.get('safe_mode') else 'Reduce-only'} active\n" + f"Strategy: {event['strategy']} | Tick: {event['tick_count']}" + ) + return + + # Throttled PnL update + if now - self._last_tick_summary < self.tick_summary_interval_s: + return + + self._last_tick_summary = now + sign = lambda v: f"+{v:.2f}" if v >= 0 else f"{v:.2f}" + total = event.get("upnl", 0) + event.get("rpnl", 0) + text = ( + f"T{event['tick_count']} | {event['instrument']} mid={event['mid_price']:.4f}\n" + f"Pos: {sign(event['pos_qty'])} | PnL: ${sign(total)}" + ) + await self._send(text) + + async def _send_shutdown(self, event: Dict[str, Any]) -> None: + """Always send shutdown summary.""" + text = shutdown_card( + tick_count=event["tick_count"], + total_placed=event["total_placed"], + total_filled=event["total_filled"], + total_pnl=event["total_pnl"], + elapsed_s=event["elapsed_s"], + ) + await self._send(text) + + async def _send_error(self, event: Dict[str, Any]) -> None: + """Always send error notifications.""" + await self._send(f"ERROR: {event.get('message', 'Unknown error')}") + + async def _send_risk_alert(self, event: Dict[str, Any]) -> None: + """Always send risk alerts.""" + await self._send(f"RISK ALERT: {event.get('message', 'Risk event triggered')}") + + async def _send(self, text: str) -> None: + """Send a message to the configured chat.""" + try: + await self.bot.send_message(chat_id=self.chat_id, text=text) + except Exception as e: + log.error("Failed to send Telegram message: %s", e)