Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ RUN apt-get update \

WORKDIR /app
COPY . .
RUN pip install --no-cache-dir -e ".[mcp]"
RUN pip install --no-cache-dir -e ".[mcp,telegram]"

# Persistent state volume (Railway mounts here)
RUN mkdir -p /data
Expand Down
58 changes: 58 additions & 0 deletions cli/commands/telegram_cmd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""hl telegram — start the Telegram bot interface."""
from __future__ import annotations

import logging
import sys
from pathlib import Path

import typer

telegram_app = typer.Typer()


@telegram_app.command("start")
def telegram_start(
mainnet: bool = typer.Option(
False, "--mainnet",
help="Connect to mainnet (default: testnet)",
),
dry_run: bool = typer.Option(
False, "--dry-run",
help="Agents run in dry-run mode (no real orders)",
),
):
"""Start the Telegram bot for deploying and controlling trading agents."""
project_root = str(Path(__file__).resolve().parent.parent.parent)
if project_root not in sys.path:
sys.path.insert(0, project_root)

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)-14s %(levelname)-5s %(message)s",
datefmt="%H:%M:%S",
)

from tg_bot.config import TelegramBotConfig
from tg_bot.bot import run_bot

config = TelegramBotConfig.from_env()

if mainnet:
config.default_network = "mainnet"

if not config.bot_token:
typer.echo(
"ERROR: TELEGRAM_BOT_TOKEN not set.\n"
"1. Create a bot via @BotFather on Telegram\n"
"2. Set TELEGRAM_BOT_TOKEN=<your-token> in your environment\n"
"3. Run this command again",
err=True,
)
raise typer.Exit(code=1)

typer.echo(f"Network: {config.default_network}")
typer.echo(f"Chat IDs: {config.allowed_chat_ids or 'auto-detect on first /start'}")
typer.echo("Bot starting... (Ctrl+C to stop)")
typer.echo("")

run_bot(config)
2 changes: 2 additions & 0 deletions cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from cli.commands.skills import skills_app
from cli.commands.journal import journal_app
from cli.commands.keys import keys_app
from cli.commands.telegram_cmd import telegram_app

app.command("run", help="Start autonomous trading with a strategy")(run_cmd)
app.command("status", help="Show positions, PnL, and risk state")(status_cmd)
Expand All @@ -53,6 +54,7 @@
app.add_typer(skills_app, name="skills", help="Skill discovery and registry")
app.add_typer(journal_app, name="journal", help="Trade journal — structured position records with reasoning")
app.add_typer(keys_app, name="keys", help="Unified key management across backends")
app.add_typer(telegram_app, name="telegram", help="Telegram bot — deploy agents from chat")


def main():
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies = [
[project.optional-dependencies]
llm = ["anthropic>=0.40.0"]
mcp = ["mcp>=1.0.0"]
telegram = ["python-telegram-bot>=21.0"]
dev = ["pytest>=7.0", "ruff>=0.4.0", "mypy>=1.8.0"]

[project.scripts]
Expand All @@ -55,4 +56,4 @@ disallow_untyped_defs = false
ignore_missing_imports = true

[tool.setuptools.packages.find]
include = ["cli*", "strategies*", "sdk*", "common*", "parent*", "modules*", "skills*", "quoting_engine*"]
include = ["cli*", "strategies*", "sdk*", "common*", "parent*", "modules*", "skills*", "quoting_engine*", "tg_bot*"]
8 changes: 7 additions & 1 deletion scripts/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,14 @@ def build_command() -> list[str]:
elif mode == "mcp":
return py + ["mcp", "serve", "--transport", "sse"]

elif mode == "telegram":
cmd = py + ["telegram", "start"]
if os.environ.get("HL_TESTNET", "true").lower() == "false":
cmd.append("--mainnet")
return cmd

else:
log.error("Unknown RUN_MODE: %s. Use apex, wolf, strategy, or mcp.", mode)
log.error("Unknown RUN_MODE: %s. Use apex, wolf, strategy, mcp, or telegram.", mode)
sys.exit(1)


Expand Down
1 change: 1 addition & 0 deletions tg_bot/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Telegram bot interface for deploying and controlling on-chain trading agents."""
64 changes: 64 additions & 0 deletions tg_bot/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Single-user authentication for Telegram bot."""
from __future__ import annotations

import logging
from functools import wraps
from typing import Callable

from telegram import Update
from telegram.ext import ContextTypes

log = logging.getLogger("telegram.auth")

# File to persist auto-detected chat ID
_CHAT_ID_FILE = None


def _get_chat_id_file():
from pathlib import Path
return Path.home() / ".hl-agent" / "telegram_chat_id"


def load_persisted_chat_id() -> int | None:
"""Load previously persisted chat ID from disk."""
path = _get_chat_id_file()
if path.exists():
try:
return int(path.read_text().strip())
except (ValueError, OSError):
pass
return None


def persist_chat_id(chat_id: int) -> None:
"""Save chat ID to disk for persistence across restarts."""
path = _get_chat_id_file()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(str(chat_id))


def authorized(func: Callable) -> Callable:
"""Decorator that restricts handler to allowed chat IDs.

On first interaction, if no chat IDs are configured, auto-registers the first user.
"""
@wraps(func)
async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_chat.id
allowed: list = context.bot_data.get("allowed_chat_ids", [])

if not allowed:
# Auto-register first user
allowed.append(chat_id)
context.bot_data["allowed_chat_ids"] = allowed
persist_chat_id(chat_id)
log.info("Auto-registered chat ID %d as authorized user", chat_id)

if chat_id not in allowed:
log.warning("Unauthorized access attempt from chat_id=%d", chat_id)
await update.message.reply_text("Unauthorized. This bot is private.")
return

return await func(update, context)

return wrapper
87 changes: 87 additions & 0 deletions tg_bot/bot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""Telegram bot application — entry point and handler registration."""
from __future__ import annotations

import asyncio
import logging

from telegram.ext import Application

from tg_bot.auth import load_persisted_chat_id
from tg_bot.config import TelegramBotConfig
from tg_bot.engine_bridge import EngineBridge
from tg_bot.handlers.apex import register_apex_handlers
from tg_bot.handlers.control import register_control_handlers
from tg_bot.handlers.start import build_start_handler
from tg_bot.handlers.strategy import build_deploy_handler
from tg_bot.notifier import Notifier

log = logging.getLogger("telegram.bot")


async def post_init(application: Application) -> None:
"""Called after bot is initialized but before polling starts."""
config: TelegramBotConfig = application.bot_data["config"]
event_queue: asyncio.Queue = application.bot_data["event_queue"]

# Auto-detect chat ID for notifications
chat_id = None
if config.allowed_chat_ids:
chat_id = config.allowed_chat_ids[0]
else:
persisted = load_persisted_chat_id()
if persisted:
config.allowed_chat_ids.append(persisted)
chat_id = persisted

if chat_id:
notifier = Notifier(
bot=application.bot,
chat_id=chat_id,
event_queue=event_queue,
pnl_interval_s=config.notification_interval_s,
tick_summary_interval_s=config.tick_summary_interval_s,
)
notifier.start()
application.bot_data["notifier"] = notifier
log.info("Notifier started for chat_id=%d", chat_id)
else:
log.info("No chat ID configured — notifier will start after /start")


def run_bot(config: TelegramBotConfig) -> None:
"""Build and run the Telegram bot (blocking)."""
if not config.bot_token:
raise RuntimeError("TELEGRAM_BOT_TOKEN is required. Set it in your environment.")

log.info("Starting Telegram bot (network=%s)", config.default_network)

# Create event queue for engine -> bot communication
event_queue = asyncio.Queue()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

# Build application
application = (
Application.builder()
.token(config.bot_token)
.post_init(post_init)
.build()
)

# Store shared state
application.bot_data["config"] = config
application.bot_data["event_queue"] = event_queue
application.bot_data["allowed_chat_ids"] = list(config.allowed_chat_ids)

# Create engine bridge
bridge = EngineBridge(event_queue=event_queue, loop=loop)
application.bot_data["engine_bridge"] = bridge

# Register handlers (order matters — ConversationHandlers first)
application.add_handler(build_start_handler())
application.add_handler(build_deploy_handler())
register_control_handlers(application)
register_apex_handlers(application)

log.info("Bot ready — polling for updates")
application.run_polling(drop_pending_updates=True)
39 changes: 39 additions & 0 deletions tg_bot/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Telegram bot configuration."""
from __future__ import annotations

import os
from dataclasses import dataclass, field
from typing import List, Optional


@dataclass
class TelegramBotConfig:
"""Configuration for the Telegram trading bot."""

bot_token: str = ""
allowed_chat_ids: List[int] = field(default_factory=list)
default_network: str = "testnet"
mainnet_confirmation: bool = True
notification_interval_s: int = 60
tick_summary_interval_s: int = 300
max_concurrent_agents: int = 1

@classmethod
def from_env(cls) -> "TelegramBotConfig":
token = os.environ.get("TELEGRAM_BOT_TOKEN", "")
chat_ids_raw = os.environ.get("TELEGRAM_CHAT_ID", "")
chat_ids = []
if chat_ids_raw:
chat_ids = [int(x.strip()) for x in chat_ids_raw.split(",") if x.strip()]

network = "mainnet" if os.environ.get("HL_TESTNET", "true").lower() == "false" else "testnet"

return cls(
bot_token=token,
allowed_chat_ids=chat_ids,
default_network=network,
)

@property
def is_mainnet(self) -> bool:
return self.default_network == "mainnet"
Loading