Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 66 additions & 33 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import os, sys, time, logging, signal, requests
import os, sys, time, logging, signal, requests, threading
import schedule
from dotenv import load_dotenv
from reports.scheduler import start_eod_scheduler, run_pending
from telegram.api import send_telegram
try:
from telegram.api import telegram_long_poll_loop # new canonical name
from telegram.api import telegram_long_poll_loop, send_telegram
except ImportError: # pragma: no cover - fallback when legacy bundle lacks the helper
from telegram.api import send_telegram
telegram_long_poll_loop = None
from reports.day_report import build_day_report_text
from telegram.dispatcher import dispatch
from core.watch import make_from_env
from core.wallet_monitor import make_wallet_monitor
Expand All @@ -21,6 +22,11 @@ def start_signals_server_if_enabled():
_shutdown=False
_updates_offset=None

def _env_str(k, d=""): return os.getenv(k, d) or d
def _env_float(k, d):
try: return float(os.getenv(k, str(d)) or d)
except: return d

def _setup_logging():
level=os.getenv("LOG_LEVEL","INFO").upper()
logging.basicConfig(level=getattr(logging, level, logging.INFO),
Expand All @@ -31,62 +37,89 @@ def _handle(sig, frm):

def _legacy_telegram_long_poll_loop(handler):
global _updates_offset
try:
token=os.getenv("TELEGRAM_BOT_TOKEN","").strip()
if not token: return
url=f"https://api.telegram.org/bot{token}/getUpdates"
params={"timeout": 10}
if _updates_offset is not None:
params["offset"]=_updates_offset
r=requests.get(url, params=params, timeout=15)
resp=r.json() if r.headers.get("content-type","").startswith("application/json") else {"ok": False}
if not resp.get("ok", True):
return
for upd in resp.get("result", []):
_updates_offset = max(_updates_offset or 0, upd.get("update_id", 0) + 1)
msg=upd.get("message") or upd.get("edited_message") or {}
if not msg:
while not _shutdown:
try:
token=os.getenv("TELEGRAM_BOT_TOKEN","").strip()
if not token:
time.sleep(5)
continue
chat_id=(msg.get("chat") or {}).get("id")
text=msg.get("text")
if not text:
url=f"https://api.telegram.org/bot{token}/getUpdates"
params={"timeout": 10}
if _updates_offset is not None:
params["offset"]=_updates_offset
r=requests.get(url, params=params, timeout=15)
resp=r.json() if r.headers.get("content-type","").startswith("application/json") else {"ok": False}
if not resp.get("ok", True):
time.sleep(1)
continue
reply=handler(text, chat_id)
if reply:
send_telegram(reply)
except Exception as e:
logging.debug("telegram poll failed: %s", e)
for upd in resp.get("result", []):
_updates_offset = max(_updates_offset or 0, upd.get("update_id", 0) + 1)
msg=upd.get("message") or upd.get("edited_message") or {}
if not msg:
continue
chat_id=(msg.get("chat") or {}).get("id")
text=msg.get("text")
if not text:
continue
reply=handler(text, chat_id)
if reply:
send_telegram(reply)
except Exception as e:
logging.debug("telegram poll failed: %s", e)
time.sleep(1)
else:
time.sleep(0.1)


if telegram_long_poll_loop is None:
telegram_long_poll_loop = _legacy_telegram_long_poll_loop

def _send_daily_report():
try:
text = build_day_report_text({})
send_telegram("📒 Daily Report\n" + (text or "(empty)"))
except Exception as e:
send_telegram("⚠️ Failed to generate daily report.")
return True

def main()->int:
load_dotenv(); _setup_logging()
send_telegram("✅ Cronos DeFi Sentinel started and is online.")
try: start_eod_scheduler()
except Exception as e: logging.warning("EOD scheduler error: %s", e)
EOD_TIME = _env_str("EOD_TIME", "23:59")
schedule.every().day.at(EOD_TIME).do(_send_daily_report)
logging.info("EOD daily report scheduled for %s", EOD_TIME)
watcher=make_from_env()
wallet_mon=make_wallet_monitor(provider=fetch_wallet_txs)
try: start_signals_server_if_enabled()
except Exception as e: logging.warning("signals server error: %s", e)
signal.signal(signal.SIGTERM, _handle); signal.signal(signal.SIGINT, _handle)
holdings_refresh = int(os.getenv("HOLDINGS_REFRESH_SEC","60") or 60); last_hold=0.0
try:
threading.Thread(target=telegram_long_poll_loop, args=(dispatch,), daemon=True).start()
except Exception as e:
logging.warning("telegram long-poll thread start failed: %s", e)
holdings_refresh = _env_float("HOLDINGS_REFRESH_SEC", 60.0); last_hold=0.0
wallet=os.getenv("WALLET_ADDRESS","")
poll=int(os.getenv("WALLET_POLL","15") or 15)
poll=_env_float("WALLET_POLL", 15.0)
last_error_ts = 0.0
while not _shutdown:
t0=time.time()
try:
watcher.poll_once()
wallet_mon.poll_once()
run_pending()
schedule.run_pending()
if time.time()-last_hold >= holdings_refresh:
snap=get_wallet_snapshot(wallet) or {}
set_holdings(set(snap.keys()))
last_hold=time.time()
telegram_long_poll_loop(dispatch)
except Exception as e:
logging.exception("loop error: %s", e)
now = time.time()
if now - last_error_ts >= 120:
try:
send_telegram("⚠️ runtime error (throttled)")
except Exception:
pass
last_error_ts = now
time.sleep(max(0.5, poll - (time.time()-t0)))
return 0

Expand Down