diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9a44cbe --- /dev/null +++ b/.gitignore @@ -0,0 +1,25 @@ +# Python +__pycache__/ +*.py[cod] +*.pyo +*.pyd + +# Virtual environments +.env +.venv +venv/ +ENV/ +env/ + +# Distribution / packaging +*.egg-info/ +*.egg +*.zip + +# OS files +.DS_Store +Thumbs.db + +# Logs and artifacts +data/raw/*.tmp +data/artifacts/*.log diff --git a/cerberus/LICENSE b/cerberus/LICENSE new file mode 100644 index 0000000..9d9bcc3 --- /dev/null +++ b/cerberus/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 Project Cerberus + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/cerberus/README.md b/cerberus/README.md new file mode 100644 index 0000000..8e91a90 --- /dev/null +++ b/cerberus/README.md @@ -0,0 +1,103 @@ +# Project Cerberus + +Project Cerberus is a research-focused algorithmic trading stack that demonstrates a +survivorship-aware, look-ahead-safe equity momentum + profitability strategy with +institutional-style plumbing. The repository contains utilities for data acquisition, +backtesting, risk management, paper trading, and interactive monitoring via Streamlit. + +> **Disclaimer:** This project is for research and educational purposes only. It does +> not constitute investment advice, an offer, or solicitation to buy or sell securities. + +## Features +- Survivorship-aware S&P 500 universe with 90-day-lagged fundamentals in free mode. +- Monthly momentum (12-1 and 6-1 variants) combined with profitability filters. +- Randomized month-end rebalances, tiered transaction costs, capacity constraints, + volatility-regime sizing, tax-drag toggle, and risk kill switches. +- Robust evaluation metrics including Deflated Sharpe Ratio and CSCV-style PBO. +- Paper trading integration with Alpaca and fill-quality logging. +- Streamlit dashboard with performance visualization and alerting. +- Clean abstraction between the strategy engine and data providers to allow a swap-in + paid data source (Tiingo/Sharadar) without touching strategy logic. + +## Installation + +```bash +python -m venv .venv +# macOS/Linux +source .venv/bin/activate +# Windows PowerShell +.venv\\Scripts\\Activate.ps1 +pip install -r requirements.txt +``` + +## Configuration + +All runtime configuration lives in `conf/cerberus.yaml`. Key sections: + +- `data`: Source selection (`free`, `tiingo`, `sharadar`), start date, caching options, + and optional ticker overrides for fast iteration. +- `portfolio`: Number of holdings and per-name weight and ADV caps. +- `rebalance`: Random rebalance window and reproducibility seed. +- `signals`: Momentum lookback/skip structure and robustness grids for PBO. +- `risk`: Regime filters using VIX/realized volatility. +- `costs`: Tiered transaction cost settings by %ADV bucket. +- `tax`: Short-term capital gains drag toggle and rate. +- `evaluation`: Required performance gates and PBO sampling density. +- `paper`: Alpaca paper trading configuration. +- `dashboard`: Alert thresholds for drawdown and slippage. + +## Commands + +All commands are executed from the repository root: + +```bash +python -m src.cli load # Download data for the configured provider +python -m src.cli backtest # Run the Cerberus v2 backtest and write artifacts +python -m src.cli dual # Run the ETF dual-momentum demo +python -m src.cli paper --date YYYY-MM-DD # Execute a paper-trade rebalance via Alpaca +streamlit run src/dashboard.py # Launch the monitoring dashboard +``` + +## Data Model + +The data loaders return five aligned panels: + +1. `adj_close_daily` – Adjusted close prices by trading day and ticker. +2. `volume_daily` – Corresponding daily share volume. +3. `gp_monthly_lag90` – Gross profit per ticker, lagged 90 days and sampled monthly. +4. `assets_monthly_lag90` – Total assets with the same lag/alignment. +5. `sp500_membership_monthly` – Boolean panel of S&P 500 membership by month end. + +These panels are persisted to CSV in `data/raw/` and consumed by the strategy engine. + +## Performance Gates + +Backtests are expected to meet the following minimums before capital deployment: + +- Annualized Sharpe Ratio ≥ 1.0 +- Calmar Ratio ≥ 0.5 +- Deflated Sharpe Ratio > 0 +- CSCV-style Probability of Backtest Overfitting (PBO) < 0.5 + +If the backtest fails to meet the Sharpe or Calmar thresholds the CLI will exit with +an error to prevent accidental promotion to paper/live trading. + +## Swapping Data Providers + +The strategy interacts with the data layer through a stable interface returning the +five panels listed above. To upgrade to Tiingo or Sharadar, implement the TODOs in +`src/data_tiingo.py` or `src/data_sharadar.py` and update `data.mode` in the config. +No changes are required in the strategy or evaluation code. + +## Known Limitations + +- Free-mode fundamentals rely on Yahoo Finance reporting dates, which may differ from + official SEC filing dates. A 90-day lag is applied as a conservative approximation. +- Historical S&P 500 membership from community-maintained sources may include small + errors. Validation guards ensure monthly membership counts remain within reasonable + bounds, but institutional users should license high-quality datasets for production. +- Yahoo Finance occasionally rate-limits requests; caching is enabled by default to + avoid repeated downloads. + +Despite these limitations, the free stack provides a solid foundation for iterative +research before investing in commercial data feeds. diff --git a/cerberus/conf/cerberus.yaml b/cerberus/conf/cerberus.yaml new file mode 100644 index 0000000..b98214c --- /dev/null +++ b/cerberus/conf/cerberus.yaml @@ -0,0 +1,42 @@ +data: + mode: "free" + start: "2000-01-01" + sp500_changes_csv: "https://raw.githubusercontent.com/fja05680/sp500/master/S%26P%20500%20Historical%20Components%20%26%20Changes(08-15-2024).csv" + cache_raw: true + cache_interim: true + tickers_override: [] +portfolio: + top_n: 30 + max_pos_weight: 0.10 + adv_cap_pct: 0.05 +rebalance: + window_last_days: 3 + seed: 42 +signals: + lookback_months: 12 + skip_months: 1 + robustness_lookbacks: [6, 12] + robustness_topn: [20, 25, 30] +risk: + use_vix_filter: true + vix_threshold: 30 + realized_vol_threshold: 0.25 +costs: + small_threshold: 0.01 + med_threshold: 0.02 + rate_small_bps: 10 + rate_med_bps: 20 + rate_large_bps: 50 +tax: + apply_tax_drag: false + stcg_rate: 0.35 +evaluation: + pbo_trials: 10 + min_sharpe: 1.0 + min_calmar: 0.5 +paper: + alpaca_base_url: "https://paper-api.alpaca.markets" + symbols_blocklist: [] +dashboard: + dd_alert: -0.20 + slip_alert_multiplier: 4 diff --git a/cerberus/pyproject.toml b/cerberus/pyproject.toml new file mode 100644 index 0000000..98cc9de --- /dev/null +++ b/cerberus/pyproject.toml @@ -0,0 +1,15 @@ +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[tool.black] +line-length = 100 +target-version = ["py310"] + +[tool.ruff] +line-length = 100 +select = ["E", "F", "B", "I"] +ignore = ["E501"] + +[tool.pytest.ini_options] +addopts = "-q" diff --git a/cerberus/requirements.txt b/cerberus/requirements.txt new file mode 100644 index 0000000..16a3b6e --- /dev/null +++ b/cerberus/requirements.txt @@ -0,0 +1,11 @@ +pandas>=2.0 +numpy>=1.24 +yfinance>=0.2 +requests>=2.31 +beautifulsoup4>=4.12 +lxml>=4.9 +streamlit>=1.31 +pyyaml>=6.0 +typer>=0.9 +scipy>=1.10 +pytest>=7.4 diff --git a/cerberus/src/__init__.py b/cerberus/src/__init__.py new file mode 100644 index 0000000..be46210 --- /dev/null +++ b/cerberus/src/__init__.py @@ -0,0 +1,64 @@ +"""Top-level package for Project Cerberus. + +This module exposes convenience attributes so that downstream code can +import the major Cerberus components without drilling into the package +structure. The heavy imports are intentionally avoided at module import +so that clients can cherry-pick submodules on demand without incurring +side effects (e.g., network calls during configuration loading). +""" + +from __future__ import annotations + +from importlib import import_module +from typing import Any + +__all__ = [ + "get_module", + "AVAILABLE_MODULES", +] + +# Mapping of friendly names to import paths used by :func:`get_module`. +AVAILABLE_MODULES = { + "cli": "cerberus.src.cli", + "strategy": "cerberus.src.strategy", + "metrics": "cerberus.src.metrics", + "data_free": "cerberus.src.data_free", + "data_tiingo": "cerberus.src.data_tiingo", + "data_sharadar": "cerberus.src.data_sharadar", + "dual_momentum": "cerberus.src.dual_momentum", + "paper_alpaca": "cerberus.src.paper_alpaca", + "utils.io": "cerberus.src.utils.io", + "utils.dates": "cerberus.src.utils.dates", + "utils.logging": "cerberus.src.utils.logging", +} + + +def get_module(name: str) -> Any: + """Dynamically import and return a Cerberus submodule. + + Parameters + ---------- + name: + Friendly identifier defined in :data:`AVAILABLE_MODULES`. + + Returns + ------- + Any + The imported module object. + + Raises + ------ + KeyError + If *name* is not registered. + ImportError + If the underlying module cannot be imported. + """ + + try: + module_path = AVAILABLE_MODULES[name] + except KeyError as exc: # pragma: no cover - defensive guard + raise KeyError( + f"Unknown Cerberus module '{name}'. Available keys: {sorted(AVAILABLE_MODULES)}" + ) from exc + + return import_module(module_path) diff --git a/cerberus/src/cli.py b/cerberus/src/cli.py new file mode 100644 index 0000000..435ceb0 --- /dev/null +++ b/cerberus/src/cli.py @@ -0,0 +1,178 @@ +"""Command-line interface for Project Cerberus.""" + +from __future__ import annotations + +import copy +import json +import subprocess +from pathlib import Path +from typing import Dict + +import pandas as pd +import typer +import yaml + +from . import data_free, data_sharadar, data_tiingo +from .metrics import ann_return, ann_vol, calmar, deflated_sharpe, max_drawdown, sharpe_ann, simple_cscv_pbo +from .paper_alpaca import AlpacaPaperTrader, execute_rebalance +from .strategy import BacktestResult, run_backtest +from .dual_momentum import dual_momentum_returns +from .utils.io import read_panel_csv +from .utils.logging import get_logger + +app = typer.Typer(help="Project Cerberus command suite") +LOGGER = get_logger("cli") +ROOT = Path(__file__).resolve().parents[1] +CONFIG_PATH = ROOT / "conf" / "cerberus.yaml" +ARTIFACT_DIR = ROOT / "data" / "artifacts" + + +def load_config(path: Path = CONFIG_PATH) -> dict: + with open(path, "r", encoding="utf-8") as fh: + return yaml.safe_load(fh) + + +def _select_loader(mode: str): + if mode == "free": + return data_free.load_free_data + if mode == "tiingo": + return data_tiingo.load_tiingo_data + if mode == "sharadar": + return data_sharadar.load_sharadar_data + raise ValueError(f"Unsupported data mode: {mode}") + + +def _load_cached_panels() -> Dict[str, pd.DataFrame]: + panels = {} + for key, filename in data_free.PANEL_FILENAMES.items(): + path = ROOT / filename + if not path.exists(): + raise FileNotFoundError( + f"Missing cached panel {filename}. Run `python -m src.cli load` first." + ) + panels[key] = read_panel_csv(path) + return panels + + +@app.command() +def load() -> None: + """Download and cache data according to the configuration.""" + + config = load_config() + loader = _select_loader(config["data"]["mode"]) + loader(config, ROOT) + typer.secho("Data load complete.", fg=typer.colors.GREEN) + + +def _write_artifacts(result: BacktestResult) -> None: + df = result.performance + ARTIFACT_DIR.mkdir(parents=True, exist_ok=True) + df[["net_ret"]].to_csv(ARTIFACT_DIR / "net_ret.csv") + df[["tcost"]].to_csv(ARTIFACT_DIR / "tcost.csv") + df[["turnover"]].to_csv(ARTIFACT_DIR / "turnover.csv") + df[["roll_sharpe_6m"]].to_csv(ARTIFACT_DIR / "roll_sharpe_6m.csv") + df[["equity"]].to_csv(ARTIFACT_DIR / "equity.csv") + + +def _print_summary(perf: pd.DataFrame, config: dict, panels: Dict[str, pd.DataFrame]) -> Dict[str, float]: + net = perf["net_ret"] + metrics = { + "CAGR": ann_return(net), + "Vol": ann_vol(net), + "Sharpe": sharpe_ann(net), + "MaxDD": max_drawdown(net), + "Calmar": calmar(net), + "HitRate": (net > 0).mean(), + } + typer.echo(json.dumps(metrics, indent=2, default=float)) + + dsr = deflated_sharpe(net, config["evaluation"]["pbo_trials"]) + typer.echo(f"Deflated Sharpe Ratio: {dsr:.3f}") + + variants = [] + lookbacks = config["signals"].get("robustness_lookbacks", []) + topns = config["signals"].get("robustness_topn", []) + for lb in lookbacks: + for tn in topns: + variant_cfg = copy.deepcopy(config) + variant_cfg["signals"]["lookback_months"] = lb + variant_cfg["portfolio"]["top_n"] = tn + result = run_backtest(panels, variant_cfg) + variants.append(result.performance["net_ret"]) + if variants: + pbo = simple_cscv_pbo(variants) + typer.echo(f"PBO Approximation: {pbo:.3f}") + else: + pbo = float("nan") + typer.echo("PBO Approximation: n/a (no variants)") + + gates = config["evaluation"] + if metrics["Sharpe"] < gates["min_sharpe"] or metrics["Calmar"] < gates["min_calmar"]: + typer.secho("Performance gates failed.", fg=typer.colors.RED) + raise typer.Exit(code=1) + return metrics + + +@app.command() +def backtest() -> None: + """Run the Cerberus v2 backtest.""" + + config = load_config() + panels = _load_cached_panels() + result = run_backtest(panels, config) + _write_artifacts(result) + _print_summary(result.performance, config, panels) + typer.secho("Backtest complete.", fg=typer.colors.GREEN) + + +@app.command() +def dual() -> None: + """Run the ETF dual momentum demo.""" + + panels = _load_cached_panels() + prices = panels["adj_close_daily"][["SPY", "IEF", "BIL"]].dropna() + returns = dual_momentum_returns(prices) + path = ARTIFACT_DIR / "dual_momentum.csv" + ARTIFACT_DIR.mkdir(parents=True, exist_ok=True) + returns.to_csv(path) + typer.echo(f"Dual momentum CAGR: {ann_return(returns):.3f}, Sharpe: {sharpe_ann(returns):.3f}") + + +@app.command() +def paper( + date: str = typer.Option(None, help="Rebalance date (YYYY-MM-DD)"), + notional: float = typer.Option(1_000_000.0, help="Portfolio notional for order sizing."), +) -> None: + """Execute the latest rebalance in Alpaca paper trading.""" + + config = load_config() + panels = _load_cached_panels() + result = run_backtest(panels, config) + target_date = pd.Timestamp(date) if date else result.performance.index.max() + if target_date not in result.weights: + raise typer.BadParameter(f"No rebalance found for {target_date.date()}") + weights = result.weights[target_date] + prices = panels["adj_close_daily"].loc[target_date] + trader = AlpacaPaperTrader(config["paper"]["alpaca_base_url"]) + execute_rebalance( + trader, + weights, + prices, + notional=notional, + slip_alert_multiplier=config["dashboard"]["slip_alert_multiplier"], + modeled_medium_bps=config["costs"]["rate_med_bps"], + blocklist=config["paper"].get("symbols_blocklist", []), + artifacts_dir=ARTIFACT_DIR, + ) + typer.secho("Paper trade submitted.", fg=typer.colors.GREEN) + + +@app.command() +def dashboard() -> None: + """Launch the Streamlit dashboard.""" + + subprocess.run(["streamlit", "run", str(ROOT / "src" / "dashboard.py")], check=False) + + +if __name__ == "__main__": + app() diff --git a/cerberus/src/dashboard.py b/cerberus/src/dashboard.py new file mode 100644 index 0000000..77284b9 --- /dev/null +++ b/cerberus/src/dashboard.py @@ -0,0 +1,106 @@ +"""Streamlit dashboard for Project Cerberus.""" + +from __future__ import annotations + +from pathlib import Path + +import pandas as pd +import streamlit as st + +ARTIFACT_DIR = Path(__file__).resolve().parents[1] / "data" / "artifacts" + + +def _load_series(name: str) -> pd.Series | None: + path = ARTIFACT_DIR / f"{name}.csv" + if not path.exists(): + return None + df = pd.read_csv(path, index_col=0, parse_dates=True) + if df.shape[1] == 1: + return df.iloc[:, 0] + return df.squeeze() + + +def main() -> None: + st.set_page_config(page_title="Project Cerberus", layout="wide") + st.title("Project Cerberus Dashboard") + st.write("Run with `streamlit run src/dashboard.py`. Artifacts must exist in data/artifacts/.") + + net_ret = _load_series("net_ret") + tcost = _load_series("tcost") + turnover = _load_series("turnover") + roll_sharpe = _load_series("roll_sharpe_6m") + equity = _load_series("equity") + + if equity is None and net_ret is not None: + equity = (1 + net_ret).cumprod() + + if equity is None: + st.error("Missing artifacts. Run `python -m src.cli backtest` first.") + return + + drawdown = equity / equity.cummax() - 1 + + col1, col2 = st.columns(2) + with col1: + st.subheader("Equity Curve") + st.line_chart(equity) + with col2: + st.subheader("Drawdown") + st.area_chart(drawdown) + + col3, col4 = st.columns(2) + with col3: + st.subheader("Rolling 6M Sharpe") + if roll_sharpe is not None: + st.line_chart(roll_sharpe) + else: + st.info("Rolling Sharpe artifact missing.") + with col4: + st.subheader("Turnover & Costs") + if turnover is not None: + st.bar_chart(turnover) + if tcost is not None: + st.bar_chart(tcost) + + st.subheader("Alerts") + alerts = [] + config_path = Path(__file__).resolve().parents[1] / "conf" / "cerberus.yaml" + dd_alert = -0.2 + slip_alert_multiplier = 4 + if config_path.exists(): + import yaml + + with open(config_path, "r", encoding="utf-8") as fh: + cfg = yaml.safe_load(fh) + dd_alert = cfg.get("dashboard", {}).get("dd_alert", dd_alert) + slip_alert_multiplier = cfg.get("dashboard", {}).get("slip_alert_multiplier", slip_alert_multiplier) + + if drawdown.iloc[-1] <= dd_alert: + alerts.append(f"Drawdown breach: {drawdown.iloc[-1]:.1%} <= {dd_alert:.1%}") + + if roll_sharpe is not None: + negative_streak = (roll_sharpe < 0).rolling(6).sum() + if not negative_streak.dropna().empty and negative_streak.dropna().iloc[-1] == 6: + alerts.append("Rolling 6-month Sharpe negative for 6 consecutive months") + + fills_path = ARTIFACT_DIR / "fills.csv" + if fills_path.exists(): + fills = pd.read_csv(fills_path) + if not fills.empty: + recent = fills.tail(10) + breach = recent[recent["slip_bps"] > slip_alert_multiplier * 20] + for _, row in breach.iterrows(): + alerts.append( + f"Slippage alert {row['symbol']}: {row['slip_bps']:.1f} bps > {slip_alert_multiplier * 20:.1f} bps" + ) + st.dataframe(recent) + + if alerts: + for alert in alerts: + st.error(alert) + else: + st.success("No alerts triggered.") + + +if __name__ == "__main__": + main() diff --git a/cerberus/src/data_free.py b/cerberus/src/data_free.py new file mode 100644 index 0000000..b9cffb1 --- /dev/null +++ b/cerberus/src/data_free.py @@ -0,0 +1,315 @@ +"""Free data stack built on top of Yahoo Finance. + +The functions in this module implement the survivorship-aware S&P 500 universe, +price/volume history, and lagged fundamentals required by the Cerberus strategy. +The loader honours the interface shared with the paid data adapters so callers can +swap implementations without modifying strategy code. +""" + +from __future__ import annotations + +import io +import re +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Sequence + +import numpy as np +import pandas as pd +import requests +import yfinance as yf + +from .utils.io import read_panel_csv, write_panel_csv +from .utils.logging import get_logger + +LOGGER = get_logger("data.free") + + +@dataclass(slots=True) +class FreeDataConfig: + """Typed view over the ``data`` section of the YAML configuration.""" + + start: datetime + sp500_changes_csv: str + cache_raw: bool + cache_interim: bool + tickers_override: Sequence[str] + + +PANEL_FILENAMES = { + "adj_close_daily": Path("data/raw/adj_close_daily.csv"), + "volume_daily": Path("data/raw/volume_daily.csv"), + "gp_monthly_lag90": Path("data/raw/gp_monthly.csv"), + "assets_monthly_lag90": Path("data/raw/assets_monthly.csv"), + "sp500_membership_monthly": Path("data/raw/sp500_membership_monthly.csv"), +} + + +def _normalize_ticker(ticker: str) -> str: + return ticker.strip().upper().replace(".", "-") + + +def _parse_ticker_field(field: object) -> list[str]: + if field is None or (isinstance(field, float) and np.isnan(field)): + return [] + if isinstance(field, (list, tuple, set)): + return [_normalize_ticker(str(t)) for t in field if str(t).strip()] + text = str(field) + if not text.strip(): + return [] + # Many datasets separate tickers using commas or spaces. + if ";" in text: + parts = text.split(";") + elif "," in text: + parts = text.split(",") + else: + parts = text.split() + return [_normalize_ticker(p) for p in parts if p.strip()] + + +def _fetch_sp500_changes(csv_url: str) -> pd.DataFrame: + LOGGER.info("Downloading S&P 500 membership changes from %s", csv_url) + response = requests.get(csv_url, timeout=30) + response.raise_for_status() + df = pd.read_csv(io.StringIO(response.text)) + df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns] + if "date" not in df.columns: + raise ValueError("S&P membership CSV must contain a 'date' column") + df["date"] = pd.to_datetime(df["date"], errors="coerce") + df = df.dropna(subset=["date"]).sort_values("date") + return df + + +def _fetch_current_sp500() -> set[str]: + """Scrape the current S&P 500 membership from Wikipedia.""" + + url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies" + LOGGER.info("Fetching current S&P 500 membership from %s", url) + response = requests.get(url, timeout=30) + response.raise_for_status() + tables = pd.read_html(response.text) + if not tables: + raise RuntimeError("Failed to parse S&P 500 table from Wikipedia") + table = tables[0] + ticker_col = None + for candidate in ("Symbol", "Ticker", "symbol"): + if candidate in table.columns: + ticker_col = candidate + break + if ticker_col is None: + raise RuntimeError("Could not locate ticker column in Wikipedia table") + tickers = {_normalize_ticker(t) for t in table[ticker_col].tolist()} + return tickers + + +def _construct_membership_panel(changes: pd.DataFrame, start: datetime) -> pd.DataFrame: + current_set: set[str] = set() + membership_records: dict[pd.Timestamp, set[str]] = {} + + for _, row in changes.iterrows(): + date = row["date"] + if pd.isna(date): + continue + date = pd.Timestamp(date).tz_localize(None) + added = _parse_ticker_field(row.get("tickers_added") or row.get("ticker_added")) + removed = _parse_ticker_field(row.get("tickers_removed") or row.get("ticker_removed")) + explicit = _parse_ticker_field(row.get("tickers")) + if explicit: + current_set = set(explicit) + current_set.update(added) + current_set.difference_update(removed) + membership_records[date] = set(current_set) + + if not membership_records: + raise RuntimeError("No membership records parsed from the CSV") + + end_date = max(max(membership_records), pd.Timestamp.today().normalize()) + monthly_index = pd.date_range(start=start, end=end_date, freq="M") + all_tickers = sorted({t for members in membership_records.values() for t in members}) + membership = pd.DataFrame(False, index=monthly_index, columns=all_tickers) + sorted_dates = sorted(membership_records) + pointer = 0 + last_members: set[str] = set() + for month_end in membership.index: + while pointer < len(sorted_dates) and sorted_dates[pointer] <= month_end: + last_members = membership_records[sorted_dates[pointer]] + pointer += 1 + if last_members: + membership.loc[month_end, list(last_members)] = True + membership = membership.ffill().fillna(False) + membership.index.name = "month_end" + membership = membership.astype(bool) + return membership + + +def build_sp500_membership(config: FreeDataConfig) -> pd.DataFrame: + changes = _fetch_sp500_changes(config.sp500_changes_csv) + membership = _construct_membership_panel(changes, config.start) + latest_month = membership.index.max() + current_members = _fetch_current_sp500() + for ticker in current_members: + if ticker not in membership.columns: + membership[ticker] = False + membership.loc[latest_month, ticker] = True + membership = membership.sort_index().sort_index(axis=1) + membership = membership.ffill().fillna(False).astype(bool) + + counts = membership.sum(axis=1) + within_bounds = counts.between(450, 550) + if within_bounds.mean() < 0.9: + raise RuntimeError( + "Membership validation failed: <90% of months contain between 450 and 550 constituents." + ) + return membership + + +def _chunked(iterable: Sequence[str], size: int) -> Sequence[Sequence[str]]: + return [iterable[i : i + size] for i in range(0, len(iterable), size)] + + +def _download_prices_volumes(tickers: Sequence[str], start: datetime) -> tuple[pd.DataFrame, pd.DataFrame]: + LOGGER.info("Downloading price/volume history for %d tickers", len(tickers)) + frames_close = [] + frames_volume = [] + for batch in _chunked(list(tickers), 50): + data = yf.download( + tickers=" ".join(batch), + start=start, + auto_adjust=True, + progress=False, + group_by="ticker", + ) + if data.empty: + continue + if isinstance(data.columns, pd.MultiIndex): + closes = data.loc[:, (slice(None), "Close")] + volumes = data.loc[:, (slice(None), "Volume")] + closes.columns = [col[0] for col in closes.columns] + volumes.columns = [col[0] for col in volumes.columns] + else: + closes = data[["Close"]] + closes.columns = batch + volumes = data[["Volume"]] + volumes.columns = batch + frames_close.append(closes) + frames_volume.append(volumes) + if not frames_close: + raise RuntimeError("No price data downloaded; check ticker list or network connectivity.") + close = pd.concat(frames_close, axis=1).sort_index().sort_index(axis=1) + volume = pd.concat(frames_volume, axis=1).sort_index().sort_index(axis=1) + close.index.name = "date" + volume.index.name = "date" + return close, volume + + +_METRIC_PATTERNS = { + "gross_profit": re.compile(r"gross.*profit", re.IGNORECASE), + "total_assets": re.compile(r"total.*asset", re.IGNORECASE), +} + + +def _extract_metric(table: pd.DataFrame, pattern: re.Pattern[str]) -> pd.Series: + if table is None or table.empty: + return pd.Series(dtype=float) + labels = {str(idx): idx for idx in table.index} + for label, original in labels.items(): + if pattern.search(label.lower()): + series = table.loc[original] + if isinstance(series, pd.DataFrame): + series = series.squeeze() + series.index = pd.to_datetime(series.index) + series.name = str(original) + return series + return pd.Series(dtype=float) + + +def _download_fundamentals(tickers: Sequence[str]) -> tuple[pd.DataFrame, pd.DataFrame]: + LOGGER.info("Downloading quarterly fundamentals for %d tickers", len(tickers)) + gp_series = {} + assets_series = {} + for ticker in tickers: + info = yf.Ticker(ticker) + fin = info.quarterly_financials + bs = info.quarterly_balance_sheet + gp = _extract_metric(fin, _METRIC_PATTERNS["gross_profit"]) + assets = _extract_metric(bs, _METRIC_PATTERNS["total_assets"]) + if not gp.empty: + gp_series[ticker] = gp + if not assets.empty: + assets_series[ticker] = assets + if not gp_series or not assets_series: + raise RuntimeError("Failed to download fundamentals for the provided tickers.") + gp_df = pd.concat(gp_series, axis=1).sort_index() + assets_df = pd.concat(assets_series, axis=1).sort_index() + gp_df.columns = [c.upper() for c in gp_df.columns] + assets_df.columns = [c.upper() for c in assets_df.columns] + return gp_df, assets_df + + +def _apply_lookahead_lag(df: pd.DataFrame, lag_days: int = 90) -> pd.DataFrame: + lagged = df.copy() + lagged.index = pd.to_datetime(lagged.index) + pd.Timedelta(days=lag_days) + monthly = lagged.resample("M").last() + return monthly + + +def load_free_data(config: dict, root: Path) -> dict[str, pd.DataFrame]: + """Entry point used by the CLI and tests.""" + + data_cfg = FreeDataConfig( + start=pd.Timestamp(config["data"]["start"]).to_pydatetime(), + sp500_changes_csv=config["data"]["sp500_changes_csv"], + cache_raw=bool(config["data"].get("cache_raw", True)), + cache_interim=bool(config["data"].get("cache_interim", True)), + tickers_override=[_normalize_ticker(t) for t in config["data"].get("tickers_override", [])], + ) + + cached_frames: dict[str, pd.DataFrame] = {} + if data_cfg.cache_raw: + for key, path in PANEL_FILENAMES.items(): + absolute = root / path + if absolute.exists(): + LOGGER.info("Loading cached panel %s", key) + cached_frames[key] = read_panel_csv(absolute) + if len(cached_frames) == len(PANEL_FILENAMES): + return cached_frames + + membership = build_sp500_membership(data_cfg) + tickers = sorted(membership.columns) + if data_cfg.tickers_override: + tickers = sorted(set(data_cfg.tickers_override)) + missing = set(tickers) - set(membership.columns) + if missing: + LOGGER.warning("Ticker override contains %d names missing from membership", len(missing)) + for ticker in missing: + membership[ticker] = False + membership = membership.loc[:, sorted(tickers)] + LOGGER.warning("Using ticker override subset of %d names", len(tickers)) + + adj_close, volume = _download_prices_volumes(tickers, data_cfg.start) + gp, assets = _download_fundamentals(tickers) + + gp_monthly = _apply_lookahead_lag(gp) + assets_monthly = _apply_lookahead_lag(assets) + + outputs = { + "adj_close_daily": adj_close, + "volume_daily": volume, + "gp_monthly_lag90": gp_monthly, + "assets_monthly_lag90": assets_monthly, + "sp500_membership_monthly": membership, + } + + for key, frame in outputs.items(): + path = root / PANEL_FILENAMES[key] + LOGGER.info("Writing panel %s to %s", key, path) + if key == "sp500_membership_monthly": + write_panel_csv(frame.astype(int), path, float_format="%.0f") + else: + write_panel_csv(frame, path) + + return outputs + + +__all__ = ["load_free_data", "build_sp500_membership"] diff --git a/cerberus/src/data_sharadar.py b/cerberus/src/data_sharadar.py new file mode 100644 index 0000000..1d1a9c2 --- /dev/null +++ b/cerberus/src/data_sharadar.py @@ -0,0 +1,41 @@ +"""Sharadar data loader stub. + +The Sharadar implementation mirrors the free loader contract while sourcing data +from Quandl's premium Sharadar bundles (SEP/SF1). Follow the documented steps when +migrating to the paid feed. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Dict + +import pandas as pd + + +def load_sharadar_data(config: dict, root: Path) -> Dict[str, pd.DataFrame]: + """Load Cerberus panels using Sharadar's SEP/SF1 datasets. + + TODO: Implement the following to upgrade the data stack: + + 1. **Authentication** – read the Quandl API key (``QUANDL_API_KEY``) and build a + resilient client with retry/backoff behaviour. + 2. **Universe** – use the SEP `S&P 500` constituent history or construct it from + the changes table, normalising tickers (``BRK.B`` → ``BRK-B``) exactly as the + free loader does. + 3. **Prices/Volume** – pull the ``close`` and ``volume`` fields from the SEP daily + bundle, adjusting for splits/dividends. Align to a trading-day index shared by + all tickers. + 4. **Fundamentals** – query SF1 ``GPQ`` (gross profit quarterly) and ``ATQ`` (total + assets quarterly). Apply a minimum 90-day reporting lag before monthly sampling + to avoid look-ahead bias. + 5. **Output contract** – return the exact five panels expected by the strategy and + persist them to ``data/raw/`` when caching is enabled. + """ + + raise NotImplementedError( + "Sharadar loader is not yet implemented. See module docstring for integration steps." + ) + + +__all__ = ["load_sharadar_data"] diff --git a/cerberus/src/data_tiingo.py b/cerberus/src/data_tiingo.py new file mode 100644 index 0000000..7b6f076 --- /dev/null +++ b/cerberus/src/data_tiingo.py @@ -0,0 +1,47 @@ +"""Tiingo data loader stub. + +This module documents the drop-in interface required to replace the free Yahoo +Finance stack with Tiingo's paid endpoints. Implementors can follow the TODO +instructions to supply production-grade data retrieval without modifying the +strategy layer. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Dict + +import pandas as pd + + +def load_tiingo_data(config: dict, root: Path) -> Dict[str, pd.DataFrame]: + """Load Cerberus panels using Tiingo's API. + + TODO: Implement the following steps when upgrading to Tiingo: + + 1. **Authentication** – read the Tiingo API token from an environment variable + (e.g. ``TIINGO_API_TOKEN``) and configure the ``requests`` session headers. + 2. **Universe** – query the S&P 500 membership history via Tiingo's reference + data (``tiingo/fundamentals``) or maintain a local copy of their constituents + dataset. Ensure the output matches the boolean monthly panel contract used by + :mod:`src.data_free`. + 3. **Prices & Volume** – call the Tiingo daily prices endpoint (``tiingo/daily``) + with the ``adjustedClose`` field, retrieving both adjusted close and volume for + the entire universe. Align the output to a common trading-day index. + 4. **Fundamentals** – download quarterly ``grossProfit`` and ``totalAssets`` from + Tiingo Fundamentals (``tiingo/fundamentals``). Apply a strict 90-day lag before + resampling to month end to preserve look-ahead safety. + 5. **Caching** – honour the ``cache_raw`` flag by reading/writing identical CSVs as + the free loader, allowing seamless switching through the configuration file. + + The function must return a dictionary with the same five keys as + :func:`src.data_free.load_free_data`: + + ``adj_close_daily``, ``volume_daily``, ``gp_monthly_lag90``, + ``assets_monthly_lag90``, and ``sp500_membership_monthly``. + """ + + raise NotImplementedError("Tiingo loader is not yet implemented. See module docstring for steps.") + + +__all__ = ["load_tiingo_data"] diff --git a/cerberus/src/dual_momentum.py b/cerberus/src/dual_momentum.py new file mode 100644 index 0000000..5027b1a --- /dev/null +++ b/cerberus/src/dual_momentum.py @@ -0,0 +1,52 @@ +"""Dual momentum ETF rotation: SPY vs IEF vs BIL.""" + +from __future__ import annotations + +import pandas as pd + + +def dual_momentum_returns(prices: pd.DataFrame, lookback: int = 12) -> pd.Series: + """Compute monthly returns of the dual-momentum rotation. + + Parameters + ---------- + prices: + Daily adjusted close prices for SPY, IEF, and BIL. + lookback: + Lookback window in months for the absolute momentum signal. + """ + + required = {"SPY", "IEF", "BIL"} + missing = required - set(prices.columns) + if missing: + raise ValueError(f"Prices must contain {required}, missing {missing}") + + monthly_prices = prices.resample("M").last().dropna() + monthly_returns = monthly_prices.pct_change().dropna(how="all") + rel_momentum = monthly_prices.pct_change(lookback) + + positions = [] + index = monthly_returns.index + for date in index: + signal_date = index[index < date].max() if (index < date).any() else None + if signal_date is None: + positions.append("BIL") + continue + spy_signal = rel_momentum.loc[signal_date, "SPY"] + if pd.notna(spy_signal) and spy_signal > 0: + positions.append("SPY") + else: + ief_signal = rel_momentum.loc[signal_date, "IEF"] if "IEF" in rel_momentum.columns else None + positions.append("IEF" if pd.notna(ief_signal) else "BIL") + + weights = pd.Series(positions, index=index) + strat_returns = [] + for date in index: + asset = weights.loc[date] + ret = monthly_returns.loc[date, asset] if asset in monthly_returns.columns else 0.0 + strat_returns.append(ret) + strategy_series = pd.Series(strat_returns, index=index, name="dual_momentum_ret") + return strategy_series + + +__all__ = ["dual_momentum_returns"] diff --git a/cerberus/src/metrics.py b/cerberus/src/metrics.py new file mode 100644 index 0000000..3c33191 --- /dev/null +++ b/cerberus/src/metrics.py @@ -0,0 +1,115 @@ +"""Performance analytics and overfitting diagnostics.""" + +from __future__ import annotations + +import math +from typing import Iterable, Sequence + +import numpy as np +import pandas as pd +from scipy import stats + + +def ann_return(returns: pd.Series, periods_per_year: int = 12) -> float: + compounded = (1 + returns).prod() + n = len(returns) + if n == 0: + return float("nan") + return float(compounded ** (periods_per_year / n) - 1) + + +def ann_vol(returns: pd.Series, periods_per_year: int = 12) -> float: + if len(returns) < 2: + return float("nan") + return float(returns.std(ddof=1) * math.sqrt(periods_per_year)) + + +def sharpe_ann(returns: pd.Series, risk_free: float = 0.0, periods_per_year: int = 12) -> float: + excess = returns - risk_free / periods_per_year + vol = ann_vol(excess, periods_per_year) + if vol == 0 or np.isnan(vol): + return float("nan") + return ann_return(excess, periods_per_year) / vol + + +def max_drawdown(returns: pd.Series) -> float: + equity = (1 + returns).cumprod() + peak = equity.cummax() + dd = equity / peak - 1 + return float(dd.min()) + + +def calmar(returns: pd.Series, periods_per_year: int = 12) -> float: + dd = max_drawdown(returns) + if dd == 0: + return float("nan") + return ann_return(returns, periods_per_year) / abs(dd) + + +def hit_rate(returns: pd.Series) -> float: + if len(returns) == 0: + return float("nan") + return float((returns > 0).sum() / len(returns)) + + +def deflated_sharpe(returns: pd.Series, n_trials: int, periods_per_year: int = 12) -> float: + """Approximate Deflated Sharpe Ratio following López de Prado (2018).""" + + n = len(returns) + if n < 3: + return float("nan") + sr = sharpe_ann(returns, periods_per_year=periods_per_year) + if np.isnan(sr): + return float("nan") + skew = stats.skew(returns, bias=False) + kurt = stats.kurtosis(returns, fisher=True, bias=False) + sigma_sr = math.sqrt((1 - skew * sr + (kurt - 1) * (sr ** 2) / 4) / (n - 1)) + z = stats.norm.ppf(1 - 1 / max(n_trials, 1)) + return float(sr - sigma_sr * z) + + +def simple_cscv_pbo(returns_variants: Sequence[pd.Series]) -> float: + """Simplified CSCV-style Probability of Backtest Overfitting. + + The implementation splits the series into two contiguous halves, treating each + half as out-of-sample once. For each split the best in-sample Sharpe is tested + against the out-of-sample median Sharpe. The returned probability is the share of + splits where the best in-sample choice underperforms the median out-of-sample. + """ + + if not returns_variants: + return float("nan") + df = pd.DataFrame({i: s for i, s in enumerate(returns_variants)}).dropna() + if df.empty: + return float("nan") + mid = len(df) // 2 + if mid < 12: + mid = len(df) // 3 + if mid <= 0 or mid >= len(df): + return 0.0 + folds = [ + (df.iloc[:mid], df.iloc[mid:]), + (df.iloc[mid:], df.iloc[:mid]), + ] + overfits = 0 + for train, test in folds: + train_sharpes = train.apply(sharpe_ann) + best_idx = train_sharpes.idxmax() + test_sharpes = test.apply(sharpe_ann) + if np.isnan(test_sharpes[best_idx]): + continue + if test_sharpes[best_idx] < np.nanmedian(test_sharpes.values): + overfits += 1 + return overfits / len(folds) + + +__all__ = [ + "ann_return", + "ann_vol", + "sharpe_ann", + "max_drawdown", + "calmar", + "hit_rate", + "deflated_sharpe", + "simple_cscv_pbo", +] diff --git a/cerberus/src/paper_alpaca.py b/cerberus/src/paper_alpaca.py new file mode 100644 index 0000000..84fec16 --- /dev/null +++ b/cerberus/src/paper_alpaca.py @@ -0,0 +1,155 @@ +"""Alpaca paper-trading integration with slippage logging.""" + +from __future__ import annotations + +import os +import time +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Dict + +import pandas as pd +import requests + +from .utils.io import ensure_parent +from .utils.logging import get_logger + +LOGGER = get_logger("paper") + + +@dataclass +class OrderFill: + symbol: str + side: str + qty: float + expected_px: float + filled_avg_px: float + slip_bps: float + timestamp: datetime + + +class AlpacaPaperTrader: + """Thin wrapper around the Alpaca Paper API.""" + + def __init__(self, base_url: str) -> None: + key = os.environ.get("ALPACA_API_KEY_ID") + secret = os.environ.get("ALPACA_API_SECRET_KEY") + if not key or not secret: + raise EnvironmentError( + "Missing Alpaca credentials. Set ALPACA_API_KEY_ID and ALPACA_API_SECRET_KEY." + ) + self.base_url = base_url.rstrip("/") + self.session = requests.Session() + self.session.headers.update( + { + "APCA-API-KEY-ID": key, + "APCA-API-SECRET-KEY": secret, + "Content-Type": "application/json", + } + ) + + def _request(self, method: str, path: str, **kwargs) -> dict: + response = self.session.request(method, f"{self.base_url}{path}", timeout=30, **kwargs) + if not response.ok: + LOGGER.error("Alpaca API error %s: %s", response.status_code, response.text) + response.raise_for_status() + return response.json() + + def submit_order(self, symbol: str, qty: float, side: str, expected_px: float) -> dict: + payload = { + "symbol": symbol, + "qty": abs(int(qty)), + "side": side, + "type": "market", + "time_in_force": "day", + "extended_hours": False, + } + LOGGER.info("Submitting order %s %s x %s", side, qty, symbol) + order = self._request("POST", "/v2/orders", json=payload) + order["expected_px"] = expected_px + return order + + def poll_fill(self, order_id: str) -> dict: + for _ in range(30): + order = self._request("GET", f"/v2/orders/{order_id}") + if order.get("status") in {"filled", "canceled", "rejected"}: + return order + time.sleep(2) + LOGGER.warning("Order %s did not fill within polling window", order_id) + return order + + +def execute_rebalance( + trader: AlpacaPaperTrader, + target_weights: pd.Series, + prices: pd.Series, + *, + notional: float, + slip_alert_multiplier: float, + modeled_medium_bps: float, + blocklist: list[str] | None = None, + artifacts_dir: Path | None = None, +) -> list[OrderFill]: + """Submit orders to achieve ``target_weights`` and log fill quality.""" + + block = {b.upper() for b in (blocklist or [])} + fills: list[OrderFill] = [] + for symbol, weight in target_weights.items(): + if weight == 0 or symbol in block: + continue + price = prices.get(symbol) + if price is None or pd.isna(price): + LOGGER.warning("Skipping %s due to missing price", symbol) + continue + qty = weight * notional / price + side = "buy" if qty > 0 else "sell" + order = trader.submit_order(symbol, qty, side, price) + final_order = trader.poll_fill(order["id"]) + filled_price = float(final_order.get("filled_avg_price") or price) + slip = abs(filled_price / price - 1) * 10_000 + fill = OrderFill( + symbol=symbol, + side=side, + qty=qty, + expected_px=price, + filled_avg_px=filled_price, + slip_bps=slip, + timestamp=datetime.utcnow(), + ) + fills.append(fill) + if slip > slip_alert_multiplier * modeled_medium_bps: + LOGGER.error( + "🚨 Slippage alert for %s: %.1f bps (threshold %.1f bps)", + symbol, + slip, + slip_alert_multiplier * modeled_medium_bps, + ) + if artifacts_dir is not None and fills: + write_fills(fills, artifacts_dir) + return fills + + +def write_fills(fills: list[OrderFill], artifacts_dir: Path) -> None: + records = [ + { + "ts_utc": fill.timestamp.isoformat(), + "symbol": fill.symbol, + "side": fill.side, + "qty": fill.qty, + "expected_px": fill.expected_px, + "filled_avg_px": fill.filled_avg_px, + "slip_bps": fill.slip_bps, + } + for fill in fills + ] + df = pd.DataFrame(records) + path = artifacts_dir / "fills.csv" + ensure_parent(path) + if path.exists(): + existing = pd.read_csv(path) + df = pd.concat([existing, df], ignore_index=True) + df.to_csv(path, index=False) + + +__all__ = ["AlpacaPaperTrader", "execute_rebalance", "write_fills", "OrderFill"] diff --git a/cerberus/src/strategy.py b/cerberus/src/strategy.py new file mode 100644 index 0000000..a45e9a2 --- /dev/null +++ b/cerberus/src/strategy.py @@ -0,0 +1,313 @@ +"""Cerberus v2 strategy engine implementing momentum + profitability.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Dict + +import numpy as np +import pandas as pd + +from .utils.logging import get_logger + +LOGGER = get_logger("strategy") + + +@dataclass +class BacktestResult: + """Container returned by :func:`run_backtest`.""" + + performance: pd.DataFrame + weights: Dict[pd.Timestamp, pd.Series] + + +def momentum_monthly(adj_close_daily: pd.DataFrame, lookback_m: int, skip_m: int) -> pd.DataFrame: + """Compute look-ahead-safe momentum using monthly price panels.""" + + monthly_prices = adj_close_daily.resample("M").last() + momentum = monthly_prices.pct_change(lookback_m) + momentum = momentum.shift(skip_m) + return momentum + + +def profitability_mask(gp_m: pd.DataFrame, assets_m: pd.DataFrame, top_pct: float = 0.5) -> pd.DataFrame: + """Return boolean mask for the profitability filter.""" + + ratio = gp_m.div(assets_m).replace([np.inf, -np.inf], np.nan) + ranks = ratio.rank(axis=1, pct=True, method="average") + mask = ranks >= (1 - top_pct) + mask = mask & ratio.notna() + return mask + + +def random_monthly_rebalance_days(daily_index: pd.DatetimeIndex, seed: int, last_n: int) -> pd.DatetimeIndex: + """Randomly select one of the last ``last_n`` trading days for each month.""" + + rng = np.random.default_rng(seed) + dates = [] + for (_, month_df) in daily_index.to_series().groupby([daily_index.year, daily_index.month]): + tail = month_df.tail(last_n) + if tail.empty: + continue + pick = rng.choice(tail.values) + dates.append(pd.Timestamp(pick)) + return pd.DatetimeIndex(sorted(dates)) + + +def adv_dollar_series(px: pd.DataFrame, vol: pd.DataFrame, window: int = 20) -> pd.DataFrame: + """Compute rolling average dollar volume (ADV).""" + + dollar = px * vol + adv = dollar.rolling(window=window, min_periods=1).mean() + return adv + + +def tiered_tc_smallmediumlarge( + position_change_dollar: pd.Series, + adv_dollar: pd.Series, + *, + small_threshold: float, + med_threshold: float, + rate_small_bps: float, + rate_med_bps: float, + rate_large_bps: float, + portfolio_value: float, +) -> float: + """Calculate tiered transaction costs as a fraction of portfolio value.""" + + if portfolio_value <= 0: + return 0.0 + adv_ratio = position_change_dollar.abs() / adv_dollar.replace(0, np.nan) + bucket = pd.Series(rate_large_bps, index=position_change_dollar.index, dtype=float) + bucket = bucket.where(adv_ratio > med_threshold, rate_med_bps) + bucket = bucket.where(adv_ratio > small_threshold, rate_small_bps) + bucket = bucket.fillna(rate_large_bps) + cost = position_change_dollar.abs() * (bucket / 10_000.0) + return float(cost.sum() / portfolio_value) + + +def _enforce_adv_caps( + weights: pd.Series, + adv_cap_weights: pd.Series, +) -> tuple[pd.Series, float]: + """Apply ADV caps and renormalise to 100% gross exposure.""" + + if weights.empty: + return weights, 0.0 + adv_cap_weights = adv_cap_weights.fillna(0.0) + capped = weights.copy() + binding = capped > adv_cap_weights + capped = capped.clip(upper=adv_cap_weights) + total = capped.sum() + capped_ratio = float(binding.sum() / len(capped)) if len(capped) else 0.0 + if total == 0: + return capped, capped_ratio + if total < 1.0: + slack = 1.0 - total + eligible = capped < adv_cap_weights - 1e-6 + if eligible.any(): + alloc = weights[eligible] + alloc_total = alloc.sum() + if alloc_total > 0: + capped.loc[eligible] += alloc / alloc_total * slack + else: + capped.loc[eligible] += slack / eligible.sum() + else: + capped /= total + elif total > 1.0: + capped /= total + return capped, capped_ratio + + +def _resolve_signal_date(reference_dates: pd.DatetimeIndex, target: pd.Timestamp) -> pd.Timestamp | None: + """Find the latest date in ``reference_dates`` strictly before ``target``.""" + + mask = reference_dates < target + if not mask.any(): + return None + return reference_dates[mask].max() + + +def run_backtest( + panels: Dict[str, pd.DataFrame], + config: dict, + *, + vix_monthly: pd.Series | None = None, +) -> BacktestResult: + """Run the Cerberus v2 backtest with the supplied configuration.""" + + adj_close = panels["adj_close_daily"].sort_index() + volume = panels["volume_daily"].reindex_like(adj_close) + membership = panels["sp500_membership_monthly"].sort_index() + gp = panels["gp_monthly_lag90"].sort_index() + assets = panels["assets_monthly_lag90"].reindex_like(gp) + + portfolio_cfg = config["portfolio"] + rebalance_cfg = config["rebalance"] + signal_cfg = config["signals"] + costs_cfg = config["costs"] + tax_cfg = config["tax"] + risk_cfg = config["risk"] + + daily_index = adj_close.index + rebalance_days = random_monthly_rebalance_days( + daily_index, seed=rebalance_cfg["seed"], last_n=rebalance_cfg["window_last_days"] + ) + + momentum = momentum_monthly( + adj_close, + lookback_m=signal_cfg["lookback_months"], + skip_m=signal_cfg["skip_months"], + ) + profitability = profitability_mask(gp, assets) + adv = adv_dollar_series(adj_close, volume) + + spy_ticker = "SPY" if "SPY" in adj_close.columns else adj_close.columns[0] + realized_vol = adj_close[spy_ticker].pct_change().rolling(20).std() * np.sqrt(252) + + equity = 1.0 + prev_weights = pd.Series(dtype=float) + prev_day: pd.Timestamp | None = None + weights_history: Dict[pd.Timestamp, pd.Series] = {} + records = [] + + membership_index = membership.index + momentum_index = momentum.index + profitability_index = profitability.index + + for day in rebalance_days: + membership_date = _resolve_signal_date(membership_index, day) + momentum_date = _resolve_signal_date(momentum_index, day) + profitability_date = _resolve_signal_date(profitability_index, day) + if membership_date is None or momentum_date is None or profitability_date is None: + continue + + eligible = membership.loc[membership_date] + available_prices = adj_close.loc[day].dropna() + eligible_names = eligible[eligible].index.intersection(available_prices.index) + + if eligible_names.empty: + target_weights = pd.Series(dtype=float) + else: + mom_scores = momentum.loc[momentum_date, eligible_names] + mom_scores = mom_scores.dropna() + profit_mask = profitability.loc[profitability_date].reindex(eligible_names).fillna(False) + candidates = mom_scores.index.intersection(profit_mask[profit_mask].index) + mom_scores = mom_scores.reindex(candidates).dropna() + mom_scores = mom_scores.sort_values(ascending=False) + top_n = min(portfolio_cfg["top_n"], len(mom_scores)) + selected = mom_scores.index[:top_n] + if len(selected) == 0: + target_weights = pd.Series(dtype=float) + else: + equal_weight = min(1.0 / len(selected), portfolio_cfg["max_pos_weight"]) + target_weights = pd.Series(equal_weight, index=selected) + target_weights = target_weights.clip(upper=portfolio_cfg["max_pos_weight"]) + total_w = target_weights.sum() + if total_w > 0: + target_weights /= total_w + else: + target_weights = pd.Series(dtype=float) + + adv_cap_pct = portfolio_cfg.get("adv_cap_pct", 0.05) + if not target_weights.empty: + adv_day = adv.loc[day].reindex(target_weights.index).fillna(0.0) + adv_cap_weight = adv_day * adv_cap_pct / max(equity, 1e-6) + target_weights, capped_ratio = _enforce_adv_caps(target_weights, adv_cap_weight) + if capped_ratio > 0: + LOGGER.info("ADV cap binding on %.1f%% of names at %s", capped_ratio * 100, day.date()) + + # Regime sizing + regime_scale = 1.0 + if risk_cfg.get("use_vix_filter", True): + vix_value = None + if vix_monthly is not None: + vix_date = _resolve_signal_date(vix_monthly.index, day) + if vix_date is not None: + vix_value = float(vix_monthly.loc[vix_date]) + if vix_value is None: + vol_series = realized_vol.loc[:day].dropna() + vol_value = float(vol_series.iloc[-1]) if not vol_series.empty else None + if vol_value is not None and vol_value > risk_cfg["realized_vol_threshold"]: + regime_scale = 0.5 + elif vix_value > risk_cfg["vix_threshold"]: + regime_scale = 0.5 + target_weights = target_weights * regime_scale + + # Compute gross return since previous rebalance + if prev_day is None or prev_weights.empty: + gross_ret = 0.0 + else: + mask = (adj_close.index > prev_day) & (adj_close.index <= day) + period_prices = adj_close.loc[mask, prev_weights.index] + period_rets = period_prices.pct_change().replace([np.inf, -np.inf], np.nan).fillna(0.0) + weighted = period_rets.mul(prev_weights, axis=1).sum(axis=1) + gross_ret = float((weighted + 1.0).prod() - 1.0) + + current_weights = target_weights.reindex(adj_close.columns).fillna(0.0) + prev_aligned = prev_weights.reindex(adj_close.columns).fillna(0.0) + weight_change = current_weights - prev_aligned + turnover = float(weight_change.abs().sum()) + if turnover > 2.0: + LOGGER.warning("Turnover %.2f exceeds 200%% at %s", turnover, day.date()) + + adv_day_full = adv.loc[day].reindex(adj_close.columns).fillna(0.0) + trade_dollars = weight_change * equity + tcost_fraction = tiered_tc_smallmediumlarge( + trade_dollars, + adv_day_full.replace(0.0, np.nan), + small_threshold=costs_cfg["small_threshold"], + med_threshold=costs_cfg["med_threshold"], + rate_small_bps=costs_cfg["rate_small_bps"], + rate_med_bps=costs_cfg["rate_med_bps"], + rate_large_bps=costs_cfg["rate_large_bps"], + portfolio_value=equity, + ) + tcost = -tcost_fraction + net_ret = gross_ret + tcost + if tax_cfg.get("apply_tax_drag", False) and net_ret > 0: + net_ret *= 1 - tax_cfg.get("stcg_rate", 0.35) + equity *= 1 + net_ret + + record = { + "date": day, + "gross_ret": gross_ret, + "tcost": tcost, + "net_ret": net_ret, + "equity": equity, + "turnover": turnover, + } + records.append(record) + + prev_weights = current_weights[current_weights != 0] + prev_day = day + weights_history[day] = prev_weights + + performance = pd.DataFrame(records).set_index("date").sort_index() + if performance.empty: + raise RuntimeError("Backtest produced no observations; check data coverage.") + + net_returns = performance["net_ret"] + roll_sharpe = net_returns.rolling(6).apply( + lambda x: (np.mean(x) / np.std(x)) * np.sqrt(12) if np.std(x) > 0 else np.nan, + raw=False, + ) + performance["roll_sharpe_6m"] = roll_sharpe + equity_curve = performance["equity"] + drawdown = equity_curve / equity_curve.cummax() - 1 + sharpe_breach = performance["roll_sharpe_6m"].rolling(6).apply(lambda x: float((x < 0).all()), raw=False) + dd_threshold = config.get("dashboard", {}).get("dd_alert", -0.2) + performance["kill_flag"] = (sharpe_breach == 1.0) | (drawdown <= dd_threshold) + + return BacktestResult(performance=performance, weights=weights_history) + + +__all__ = [ + "run_backtest", + "momentum_monthly", + "profitability_mask", + "random_monthly_rebalance_days", + "adv_dollar_series", + "tiered_tc_smallmediumlarge", + "BacktestResult", +] diff --git a/cerberus/src/utils/__init__.py b/cerberus/src/utils/__init__.py new file mode 100644 index 0000000..83d6481 --- /dev/null +++ b/cerberus/src/utils/__init__.py @@ -0,0 +1,14 @@ +"""Utility helpers for Project Cerberus. + +The subpackage currently groups together I/O utilities, calendar +functions, and structured logging configuration. These modules are kept +lightweight and side-effect free so that importing ``cerberus.src.utils`` +never triggers filesystem writes or logging configuration changes by +accident. +""" + +from __future__ import annotations + +from . import dates, io, logging + +__all__ = ["dates", "io", "logging"] diff --git a/cerberus/src/utils/dates.py b/cerberus/src/utils/dates.py new file mode 100644 index 0000000..bbf5a0b --- /dev/null +++ b/cerberus/src/utils/dates.py @@ -0,0 +1,54 @@ +"""Date helpers for trading calendar alignment.""" + +from __future__ import annotations + +from datetime import datetime +from typing import Iterable + +import numpy as np +import pandas as pd + + +def month_end_index(dates: Iterable[pd.Timestamp]) -> pd.DatetimeIndex: + """Return a ``DatetimeIndex`` containing unique month-end timestamps.""" + + index = pd.DatetimeIndex(pd.to_datetime(list(dates))).tz_localize(None) + month_end = index.to_series().groupby([index.year, index.month]).max() + return pd.DatetimeIndex(month_end.values, name="month_end") + + +def rolling_month_end_series(dates: pd.DatetimeIndex) -> pd.Series: + """Map each date to the month-end for its calendar month.""" + + month_ends = month_end_index(dates) + mapper = {d: me for me in month_ends for d in dates[(dates.year == me.year) & (dates.month == me.month)]} + return pd.Series(mapper) + + +def month_range(start: str | datetime, end: str | datetime) -> pd.DatetimeIndex: + """Inclusive range of month-end timestamps between ``start`` and ``end``.""" + + return pd.date_range(start=start, end=end, freq="M") + + +def last_trading_days(daily_index: pd.DatetimeIndex, last_n: int) -> dict[pd.Timestamp, pd.DatetimeIndex]: + """Return mapping from month-end to the last ``n`` trading days in that month. + + Parameters + ---------- + daily_index: + Trading-day index (assumed sorted and tz-naive). + last_n: + Number of trailing sessions to capture per month. + """ + + month_groups = {} + series = pd.Series(index=daily_index, data=np.arange(len(daily_index))) + for (year, month), group in series.groupby([daily_index.year, daily_index.month]): + tail = group.tail(last_n) + month_end = tail.index[-1] + month_groups[month_end] = pd.DatetimeIndex(tail.index, name="rebalance_days") + return month_groups + + +__all__ = ["month_end_index", "month_range", "last_trading_days"] diff --git a/cerberus/src/utils/io.py b/cerberus/src/utils/io.py new file mode 100644 index 0000000..c11b2e9 --- /dev/null +++ b/cerberus/src/utils/io.py @@ -0,0 +1,44 @@ +"""Utility helpers for reading and and writing cached data panels.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Optional + +import pandas as pd + + +def ensure_parent(path: Path) -> None: + """Ensure the parent directory for ``path`` exists.""" + + path.parent.mkdir(parents=True, exist_ok=True) + + +def write_panel_csv( + frame: pd.DataFrame, + path: Path, + *, + float_format: Optional[str] = "%.6f", +) -> None: + """Persist a ``DataFrame`` to CSV with index retention.""" + + ensure_parent(path) + frame.to_csv(path, index=True, float_format=float_format) + + +def read_panel_csv( + path: Path, + *, + parse_dates: bool = True, + dtype: Optional[dict[str, str]] = None, +) -> pd.DataFrame: + """Read a panel CSV written by :func:`write_panel_csv`.""" + + kwargs: dict[str, object] = {"index_col": 0, "dtype": dtype} + if parse_dates: + kwargs["parse_dates"] = True + frame = pd.read_csv(path, **kwargs) + return frame + + +__all__ = ["write_panel_csv", "read_panel_csv", "ensure_parent"] diff --git a/cerberus/src/utils/logging.py b/cerberus/src/utils/logging.py new file mode 100644 index 0000000..d22f728 --- /dev/null +++ b/cerberus/src/utils/logging.py @@ -0,0 +1,44 @@ +"""Structured logging configuration for the Cerberus project.""" + +from __future__ import annotations + +import logging +from typing import Optional + + +_LOGGER_NAME = "cerberus" + + +def configure_logging(level: int = logging.INFO) -> logging.Logger: + """Configure a process-wide logger with timestamped output. + + The function is idempotent—subsequent calls will simply return the configured + logger. This avoids duplicate handlers when modules import the helper. + """ + + logger = logging.getLogger(_LOGGER_NAME) + if logger.handlers: + logger.setLevel(level) + return logger + + handler = logging.StreamHandler() + formatter = logging.Formatter( + "%(asctime)s | %(levelname)s | %(name)s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + handler.setFormatter(formatter) + + logger.setLevel(level) + logger.addHandler(handler) + logger.propagate = False + return logger + + +def get_logger(name: Optional[str] = None) -> logging.Logger: + """Return a child logger of the Cerberus root logger.""" + + root = configure_logging() + return root.getChild(name) if name else root + + +__all__ = ["configure_logging", "get_logger"] diff --git a/cerberus/tests/conftest.py b/cerberus/tests/conftest.py new file mode 100644 index 0000000..2a855d9 --- /dev/null +++ b/cerberus/tests/conftest.py @@ -0,0 +1,6 @@ +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) diff --git a/cerberus/tests/test_engine.py b/cerberus/tests/test_engine.py new file mode 100644 index 0000000..83a0204 --- /dev/null +++ b/cerberus/tests/test_engine.py @@ -0,0 +1,51 @@ +import numpy as np +import pandas as pd + +from src.strategy import random_monthly_rebalance_days, run_backtest + + +def make_panels(): + dates = pd.date_range("2020-01-01", "2021-12-31", freq="B") + tickers = ["AAA", "BBB", "CCC"] + prices = pd.DataFrame( + {t: np.linspace(100 + i * 10, 200 + i * 10, len(dates)) for i, t in enumerate(tickers)}, index=dates + ) + volume = pd.DataFrame(1_000_000, index=dates, columns=tickers) + membership_idx = pd.date_range("2019-12-31", "2021-12-31", freq="M") + membership = pd.DataFrame(True, index=membership_idx, columns=tickers) + fundamentals_idx = membership_idx + gp = pd.DataFrame(5.0, index=fundamentals_idx, columns=tickers) + assets = pd.DataFrame(10.0, index=fundamentals_idx, columns=tickers) + return { + "adj_close_daily": prices, + "volume_daily": volume, + "sp500_membership_monthly": membership, + "gp_monthly_lag90": gp, + "assets_monthly_lag90": assets, + } + + +def test_rebalance_days_last_window(): + dates = pd.date_range("2021-01-01", "2021-12-31", freq="B") + rebalances = random_monthly_rebalance_days(dates, seed=1, last_n=3) + for day in rebalances: + month_days = dates[(dates.month == day.month) & (dates.year == day.year)] + assert day in month_days[-3:] + + +def test_backtest_equity_compounding(): + panels = make_panels() + config = { + "portfolio": {"top_n": 2, "max_pos_weight": 0.6, "adv_cap_pct": 0.05}, + "rebalance": {"seed": 1, "window_last_days": 3}, + "signals": {"lookback_months": 6, "skip_months": 1, "robustness_lookbacks": [], "robustness_topn": []}, + "costs": {"small_threshold": 0.01, "med_threshold": 0.02, "rate_small_bps": 10, "rate_med_bps": 20, "rate_large_bps": 50}, + "tax": {"apply_tax_drag": False, "stcg_rate": 0.35}, + "risk": {"use_vix_filter": False, "vix_threshold": 30, "realized_vol_threshold": 0.25}, + "dashboard": {"dd_alert": -0.2}, + "evaluation": {"pbo_trials": 10, "min_sharpe": 0.0, "min_calmar": 0.0}, + } + result = run_backtest(panels, config) + perf = result.performance + equity_calc = (1 + perf["net_ret"]).cumprod() + pd.testing.assert_series_equal(equity_calc, perf["equity"], check_names=False) diff --git a/cerberus/tests/test_membership.py b/cerberus/tests/test_membership.py new file mode 100644 index 0000000..6e1d261 --- /dev/null +++ b/cerberus/tests/test_membership.py @@ -0,0 +1,35 @@ +import pandas as pd +import pytest + +from src.data_free import FreeDataConfig, build_sp500_membership + + +def test_membership_validation(monkeypatch): + base_tickers = [f"T{i:03d}" for i in range(500)] + + def fake_changes(_): + rows = [] + date = pd.Timestamp("2010-01-01") + for i in range(24): + rows.append({"date": date + pd.DateOffset(months=i), "tickers": ",".join(base_tickers)}) + return pd.DataFrame(rows) + + def fake_current(): + return set(base_tickers) + + monkeypatch.setattr("src.data_free._fetch_sp500_changes", fake_changes) + monkeypatch.setattr("src.data_free._fetch_current_sp500", fake_current) + + config = FreeDataConfig( + start=pd.Timestamp("2010-01-01"), + sp500_changes_csv="", + cache_raw=False, + cache_interim=False, + tickers_override=[], + ) + + membership = build_sp500_membership(config) + counts = membership.sum(axis=1) + assert (counts.between(450, 550).mean()) >= 0.9 + latest = membership.index.max() + assert membership.loc[latest, base_tickers].all() diff --git a/cerberus/tests/test_metrics.py b/cerberus/tests/test_metrics.py new file mode 100644 index 0000000..c3123ee --- /dev/null +++ b/cerberus/tests/test_metrics.py @@ -0,0 +1,21 @@ +import numpy as np +import pandas as pd + +from src.metrics import deflated_sharpe, simple_cscv_pbo + + +def test_deflated_sharpe_finite(): + rng = np.random.default_rng(0) + returns = pd.Series(rng.normal(0.01, 0.05, 120)) + dsr = deflated_sharpe(returns, n_trials=50) + assert np.isfinite(dsr) + + +def test_pbo_behaviour(): + idx = pd.date_range("2020-01-31", periods=60, freq="M") + base = pd.Series(0.01, index=idx) + noisy = pd.Series(np.where(np.arange(len(idx)) % 2 == 0, 0.03, -0.04), index=idx) + variants = [base, base + 0.0005, noisy] + pbo = simple_cscv_pbo(variants) + assert 0 <= pbo <= 1 + assert pbo >= 0 diff --git a/cerberus/tests/test_signals.py b/cerberus/tests/test_signals.py new file mode 100644 index 0000000..68ea7ae --- /dev/null +++ b/cerberus/tests/test_signals.py @@ -0,0 +1,26 @@ +import pandas as pd +import numpy as np + +from src.strategy import momentum_monthly, profitability_mask + + +def test_momentum_no_lookahead(): + dates = pd.date_range("2020-01-01", periods=120, freq="B") + prices = pd.DataFrame({"AAA": np.linspace(100, 200, len(dates))}, index=dates) + mom = momentum_monthly(prices, lookback_m=12, skip_m=1) + assert mom.index.is_monotonic_increasing + assert mom.iloc[1:].notna().all().all() + # signal for month t should use prices up to t-1 + monthly = prices.resample("M").last() + expected = monthly.pct_change(12).shift(1) + pd.testing.assert_series_equal(mom["AAA"], expected["AAA"], check_names=False) + + +def test_profitability_mask_rank(): + idx = pd.date_range("2020-01-31", periods=6, freq="M") + gp = pd.DataFrame(np.random.uniform(1, 10, size=(6, 4)), index=idx, columns=list("ABCD")) + assets = pd.DataFrame(np.random.uniform(5, 15, size=(6, 4)), index=idx, columns=list("ABCD")) + mask = profitability_mask(gp, assets) + assert mask.shape == gp.shape + assert mask.iloc[0].isin([True, False]).all() + assert (mask.sum(axis=1) >= 2).all()