diff --git a/core/signals/server.py b/core/signals/server.py index 1a745cb..0d822a9 100644 --- a/core/signals/server.py +++ b/core/signals/server.py @@ -1,24 +1,55 @@ +import logging import os from threading import Thread -from flask import Flask, request, jsonify + +try: + from flask import Flask, jsonify, request +except ImportError: # pragma: no cover - optional dependency + Flask = None # type: ignore[assignment] + jsonify = None # type: ignore[assignment] + request = None # type: ignore[assignment] + from core.signals.adapter import ingest_signal + + def create_app(): - app=Flask(__name__) - @app.get('/healthz') - def h(): return {'ok': True} - @app.post('/signal') + if Flask is None: + raise RuntimeError("Flask is not available; signals server cannot be started") + + app = Flask(__name__) + + @app.get("/healthz") + def h(): + return {"ok": True} + + @app.post("/signal") def s(): try: - data=request.get_json(force=True, silent=True) or {} - out=ingest_signal(data) - return jsonify({'ok': bool(out), 'action': (out or {}).get('guard_action')}) - except Exception as e: - return jsonify({'ok': False, 'error': str(e)}), 400 + data = request.get_json(force=True, silent=True) or {} + out = ingest_signal(data) + return jsonify({"ok": bool(out), "action": (out or {}).get("guard_action")}) + except Exception as e: # pragma: no cover - defensive + return jsonify({"ok": False, "error": str(e)}), 400 + return app + + def start_signals_server_if_enabled(): - enable = os.getenv('SIGNALS_HTTP','0').lower() in {'1','true','yes'} - if not enable: return None - bind=os.getenv('SIGNALS_BIND','0.0.0.0:8080'); host,port=(bind.split(':',1)+['8080'])[:2] - app=create_app() - def run(): app.run(host=host, port=int(port), debug=False, use_reloader=False) - th=Thread(target=run, daemon=True); th.start(); return th + enable = os.getenv("SIGNALS_HTTP", "0").lower() in {"1", "true", "yes"} + if not enable: + return None + + if Flask is None: + logging.warning("Signals HTTP server requested but Flask is not installed") + return None + + bind = os.getenv("SIGNALS_BIND", "0.0.0.0:8080") + host, port = (bind.split(":", 1) + ["8080"])[:2] + app = create_app() + + def run(): + app.run(host=host, port=int(port), debug=False, use_reloader=False) + + th = Thread(target=run, daemon=True) + th.start() + return th diff --git a/reports/aggregates.py b/reports/aggregates.py index ec6d256..6e97400 100644 --- a/reports/aggregates.py +++ b/reports/aggregates.py @@ -1,23 +1,75 @@ from decimal import Decimal -def _D(x): return Decimal(str(x or 0)) + + +def _dec(value) -> Decimal: + return Decimal(str(value or 0)) + + def aggregate_per_asset(entries, wallet=None): - acc={} - for e in entries: - if wallet and (e.get("wallet") or "").lower()!=wallet.lower(): continue - a=(e.get("asset") or "?").upper() - acc.setdefault(a, {"in_qty":_D(0),"out_qty":_D(0),"in_usd":_D(0),"out_usd":_D(0),"realized_usd":_D(0)}) - side=(e.get("side") or "").upper(); qty=_D(e.get("qty")); usd=_D(e.get("usd")) - if side=="IN": acc[a]["in_qty"]+=qty; acc[a]["in_usd"]+=usd - elif side=="OUT": acc[a]["out_qty"]+=qty; acc[a]["out_usd"]+=usd - acc[a]["realized_usd"]+=_D(e.get("realized_usd")) - rows=[] - for a,v in acc.items(): - netq=v["in_qty"]-v["out_qty"]; netu=v["in_usd"]-v["out_usd"] - rows.append({"asset":a, **{k:str(vv) for k,vv in v.items()}, "net_qty":str(netq), "net_usd":str(netu)}) + acc = {} + for entry in entries: + if wallet and (entry.get("wallet") or "").lower() != wallet.lower(): + continue + asset = (entry.get("asset") or "?").upper() + bucket = acc.setdefault( + asset, + { + "in_qty": Decimal("0"), + "out_qty": Decimal("0"), + "in_usd": Decimal("0"), + "out_usd": Decimal("0"), + "realized_usd": Decimal("0"), + "tx_count": 0, + }, + ) + + side = (entry.get("side") or "").upper() + qty = _dec(entry.get("qty")) + usd = _dec(entry.get("usd")) + if side == "IN": + bucket["in_qty"] += qty + bucket["in_usd"] += usd + elif side == "OUT": + bucket["out_qty"] += qty + bucket["out_usd"] += usd + + bucket["realized_usd"] += _dec(entry.get("realized_usd")) + bucket["tx_count"] += 1 + + rows = [] + for asset, values in acc.items(): + net_qty = values["in_qty"] - values["out_qty"] + net_usd = values["in_usd"] - values["out_usd"] + rows.append( + { + "asset": asset, + "in_qty": values["in_qty"], + "out_qty": values["out_qty"], + "in_usd": values["in_usd"], + "out_usd": values["out_usd"], + "realized_usd": values["realized_usd"], + "tx_count": values["tx_count"], + "net_qty": net_qty, + "net_usd": net_usd, + } + ) + return rows + + def totals(rows): - from decimal import Decimal - t={k:Decimal("0") for k in ["in_qty","out_qty","in_usd","out_usd","net_qty","net_usd","realized_usd"]} - for r in rows: - for k in t: t[k]+=Decimal(str(r.get(k) or 0)) - return {k:str(v) for k,v in t.items()} + totals_map = { + "in_qty": Decimal("0"), + "out_qty": Decimal("0"), + "in_usd": Decimal("0"), + "out_usd": Decimal("0"), + "net_qty": Decimal("0"), + "net_usd": Decimal("0"), + "realized_usd": Decimal("0"), + } + + for row in rows: + for key in totals_map: + totals_map[key] += _dec(row.get(key)) + + return totals_map