diff --git a/.gitignore b/.gitignore index acfdc6647..3726d4cfa 100644 --- a/.gitignore +++ b/.gitignore @@ -64,4 +64,6 @@ outputs/ *.sqlite3 # Alembic (keep migration files, but ignore generated/cache files) -app/backend/alembic/versions/__pycache__/ \ No newline at end of file +app/backend/alembic/versions/__pycache__/ + +.cache \ No newline at end of file diff --git a/src/data/cache.py b/src/data/cache.py index 4127934e3..b1c8704b0 100644 --- a/src/data/cache.py +++ b/src/data/cache.py @@ -1,12 +1,167 @@ +import json +from pathlib import Path +from urllib.parse import quote + + class Cache: - """In-memory cache for API responses.""" + """File-based cache for API responses stored in .cache directory.""" + + def __init__(self, cache_dir: str = ".cache"): + """ + Initialize the cache with a base directory. + + Args: + cache_dir: Base directory for cache files (default: ".cache") + """ + # Cache type to subdirectory mapping + self._cache_types = { + "prices": "prices", + "financial_metrics": "financial_metrics", + "line_items": "line_items", + "insider_trades": "insider_trades", + "company_news": "company_news", + "market_cap": "market_cap", + } + + self.cache_dir = Path(cache_dir) + self._ensure_cache_dirs() + + # Statistics file path + self.stats_file = self.cache_dir / "cache_stats.json" + self.stats = self._load_stats() + + + + def _ensure_cache_dirs(self): + """Ensure all cache subdirectories exist.""" + self.cache_dir.mkdir(parents=True, exist_ok=True) + for subdir in self._cache_types.values(): + (self.cache_dir / subdir).mkdir(parents=True, exist_ok=True) + + def _load_stats(self) -> dict: + """Load cache statistics from file.""" + if not self.stats_file.exists(): + return { + "total_hits": 0, + "total_api_calls": 0, + "by_type": { + "prices": 0, + "financial_metrics": 0, + "line_items": 0, + "insider_trades": 0, + "company_news": 0, + "market_cap": 0, + }, + "api_calls_by_type": { + "prices": 0, + "financial_metrics": 0, + "line_items": 0, + "insider_trades": 0, + "company_news": 0, + "market_cap": 0, + } + } + + try: + with open(self.stats_file, "r", encoding="utf-8") as f: + stats = json.load(f) + # Ensure all cache types are present + if "by_type" not in stats: + stats["by_type"] = {} + if "api_calls_by_type" not in stats: + stats["api_calls_by_type"] = {} + for cache_type in self._cache_types.keys(): + if cache_type not in stats["by_type"]: + stats["by_type"][cache_type] = 0 + if cache_type not in stats["api_calls_by_type"]: + stats["api_calls_by_type"][cache_type] = 0 + if "total_hits" not in stats: + stats["total_hits"] = 0 + if "total_api_calls" not in stats: + stats["total_api_calls"] = 0 + return stats + except (json.JSONDecodeError, IOError) as e: + print(f"Warning: Failed to load cache stats: {e}") + return { + "total_hits": 0, + "total_api_calls": 0, + "by_type": {cache_type: 0 for cache_type in self._cache_types.keys()}, + "api_calls_by_type": {cache_type: 0 for cache_type in self._cache_types.keys()} + } + + def _save_stats(self): + """Save cache statistics to file.""" + try: + with open(self.stats_file, "w", encoding="utf-8") as f: + json.dump(self.stats, f, ensure_ascii=False, indent=2) + except IOError as e: + print(f"Warning: Failed to save cache stats: {e}") + + def record_cache_hit(self, cache_type: str): + """Record a cache hit for the given cache type.""" + if cache_type in self._cache_types: + self.stats["total_hits"] = self.stats.get("total_hits", 0) + 1 + self.stats["by_type"][cache_type] = self.stats["by_type"].get(cache_type, 0) + 1 + self._save_stats() + + def record_api_call(self, cache_type: str): + """Record an API call for the given cache type (cache miss).""" + if cache_type in self._cache_types: + self.stats["total_api_calls"] = self.stats.get("total_api_calls", 0) + 1 + if "api_calls_by_type" not in self.stats: + self.stats["api_calls_by_type"] = {} + self.stats["api_calls_by_type"][cache_type] = self.stats["api_calls_by_type"].get(cache_type, 0) + 1 + self._save_stats() + + def _ticker_to_filename(self, ticker: str) -> str: + """Convert ticker to a safe filename by encoding special characters.""" + # Replace common special characters with safe alternatives + # Use URL encoding for safety + safe_ticker = quote(ticker, safe="") + return f"{safe_ticker}.json" + + def _get_cache_path(self, cache_type: str, ticker: str) -> Path: + """Get the file path for a specific cache entry.""" + subdir = self._cache_types.get(cache_type) + if not subdir: + raise ValueError(f"Unknown cache type: {cache_type}") + filename = self._ticker_to_filename(ticker) + return self.cache_dir / subdir / filename - def __init__(self): - self._prices_cache: dict[str, list[dict[str, any]]] = {} - self._financial_metrics_cache: dict[str, list[dict[str, any]]] = {} - self._line_items_cache: dict[str, list[dict[str, any]]] = {} - self._insider_trades_cache: dict[str, list[dict[str, any]]] = {} - self._company_news_cache: dict[str, list[dict[str, any]]] = {} + def _get_metadata_path(self, cache_type: str, ticker: str) -> Path: + """Get the file path for cache metadata (last_updated date).""" + subdir = self._cache_types.get(cache_type) + if not subdir: + raise ValueError(f"Unknown cache type: {cache_type}") + filename = self._ticker_to_filename(ticker) + # Metadata file has .meta.json extension + return self.cache_dir / subdir / f"{filename}.meta.json" + + def _load_from_file(self, cache_type: str, ticker: str) -> list[dict[str, any]] | None: + """Load cached data from file.""" + cache_path = self._get_cache_path(cache_type, ticker) + if not cache_path.exists(): + return None + + try: + with open(cache_path, "r", encoding="utf-8") as f: + data = json.load(f) + return data if isinstance(data, list) else None + except (json.JSONDecodeError, IOError) as e: + print(f"Warning: Failed to load cache from {cache_path}: {e}") + return None + + def _save_to_file(self, cache_type: str, ticker: str, data: list[dict[str, any]]): + """Save cached data to file.""" + cache_path = self._get_cache_path(cache_type, ticker) + try: + # Ensure parent directory exists + cache_path.parent.mkdir(parents=True, exist_ok=True) + + with open(cache_path, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + except IOError as e: + print(f"Warning: Failed to save cache to {cache_path}: {e}") def _merge_data(self, existing: list[dict] | None, new_data: list[dict], key_field: str) -> list[dict]: """Merge existing and new data, avoiding duplicates based on a key field.""" @@ -23,43 +178,186 @@ def _merge_data(self, existing: list[dict] | None, new_data: list[dict], key_fie def get_prices(self, ticker: str) -> list[dict[str, any]] | None: """Get cached price data if available.""" - return self._prices_cache.get(ticker) + return self._load_from_file("prices", ticker) def set_prices(self, ticker: str, data: list[dict[str, any]]): """Append new price data to cache.""" - self._prices_cache[ticker] = self._merge_data(self._prices_cache.get(ticker), data, key_field="time") + existing = self.get_prices(ticker) + merged = self._merge_data(existing, data, key_field="time") + self._save_to_file("prices", ticker, merged) - def get_financial_metrics(self, ticker: str) -> list[dict[str, any]]: + def get_financial_metrics(self, ticker: str, period: str = "ttm") -> list[dict[str, any]] | None: """Get cached financial metrics if available.""" - return self._financial_metrics_cache.get(ticker) + cache_key = f"{ticker}_{period}" + return self._load_from_file("financial_metrics", cache_key) + + def set_financial_metrics(self, ticker: str, period: str, data: list[dict[str, any]], update_date: str = None): + """Update financial metrics cache with new data.""" + cache_key = f"{ticker}_{period}" + existing = self.get_financial_metrics(ticker, period) + merged = self._merge_data(existing, data, key_field="report_period") + # Sort by report_period descending (newest first) + merged.sort(key=lambda x: x.get("report_period", ""), reverse=True) + self._save_to_file("financial_metrics", cache_key, merged) + # Update last_updated date if provided + if update_date: + self.set_last_updated_date("financial_metrics", cache_key, update_date) - def set_financial_metrics(self, ticker: str, data: list[dict[str, any]]): - """Append new financial metrics to cache.""" - self._financial_metrics_cache[ticker] = self._merge_data(self._financial_metrics_cache.get(ticker), data, key_field="report_period") + def get_last_updated_date(self, cache_type: str, ticker: str) -> str | None: + """Get the last updated date (query date) for a cache entry.""" + metadata_path = self._get_metadata_path(cache_type, ticker) + if not metadata_path.exists(): + return None + + try: + with open(metadata_path, "r", encoding="utf-8") as f: + metadata = json.load(f) + return metadata.get("last_updated") + except (json.JSONDecodeError, IOError) as e: + print(f"Warning: Failed to load metadata from {metadata_path}: {e}") + return None - def get_line_items(self, ticker: str) -> list[dict[str, any]] | None: + def set_last_updated_date(self, cache_type: str, ticker: str, date: str): + """Set the last updated date (query date) for a cache entry.""" + metadata_path = self._get_metadata_path(cache_type, ticker) + try: + metadata_path.parent.mkdir(parents=True, exist_ok=True) + with open(metadata_path, "w", encoding="utf-8") as f: + json.dump({"last_updated": date}, f, ensure_ascii=False, indent=2) + except IOError as e: + print(f"Warning: Failed to save metadata to {metadata_path}: {e}") + + def get_line_items(self, ticker: str, period: str = "ttm") -> list[dict[str, any]] | None: """Get cached line items if available.""" - return self._line_items_cache.get(ticker) + cache_key = f"{ticker}_{period}" + return self._load_from_file("line_items", cache_key) - def set_line_items(self, ticker: str, data: list[dict[str, any]]): - """Append new line items to cache.""" - self._line_items_cache[ticker] = self._merge_data(self._line_items_cache.get(ticker), data, key_field="report_period") + def set_line_items(self, ticker: str, period: str, data: list[dict[str, any]], update_date: str = None): + """Update line items cache with new data.""" + cache_key = f"{ticker}_{period}" + existing = self.get_line_items(ticker, period) + merged = self._merge_data(existing, data, key_field="report_period") + # Sort by report_period descending (newest first) + merged.sort(key=lambda x: x.get("report_period", ""), reverse=True) + self._save_to_file("line_items", cache_key, merged) + # Update last_updated date if provided + if update_date: + self.set_last_updated_date("line_items", cache_key, update_date) def get_insider_trades(self, ticker: str) -> list[dict[str, any]] | None: """Get cached insider trades if available.""" - return self._insider_trades_cache.get(ticker) + return self._load_from_file("insider_trades", ticker) - def set_insider_trades(self, ticker: str, data: list[dict[str, any]]): - """Append new insider trades to cache.""" - self._insider_trades_cache[ticker] = self._merge_data(self._insider_trades_cache.get(ticker), data, key_field="filing_date") # Could also use transaction_date if preferred + def set_insider_trades(self, ticker: str, data: list[dict[str, any]], update_date: str = None): + """Update insider trades cache with new data.""" + existing = self.get_insider_trades(ticker) + merged = self._merge_data(existing, data, key_field="filing_date") + # Sort by filing_date descending (newest first) + merged.sort(key=lambda x: x.get("filing_date", ""), reverse=True) + self._save_to_file("insider_trades", ticker, merged) + # Update last_updated date if provided + if update_date: + self.set_last_updated_date("insider_trades", ticker, update_date) def get_company_news(self, ticker: str) -> list[dict[str, any]] | None: """Get cached company news if available.""" - return self._company_news_cache.get(ticker) + return self._load_from_file("company_news", ticker) + + def set_company_news(self, ticker: str, data: list[dict[str, any]], update_date: str = None): + """Update company news cache with new data.""" + existing = self.get_company_news(ticker) + merged = self._merge_data(existing, data, key_field="date") + # Sort by date descending (newest first) + merged.sort(key=lambda x: x.get("date", ""), reverse=True) + self._save_to_file("company_news", ticker, merged) + # Update last_updated date if provided + if update_date: + self.set_last_updated_date("company_news", ticker, update_date) + + def get_market_cap(self, ticker: str) -> list[dict[str, any]] | None: + """Get cached market cap data if available.""" + return self._load_from_file("market_cap", ticker) + + def set_market_cap(self, ticker: str, data: list[dict[str, any]]): + """Update market cap cache with new data.""" + existing = self.get_market_cap(ticker) + merged = self._merge_data(existing, data, key_field="date") + # Sort by date descending (newest first) + merged.sort(key=lambda x: x.get("date", ""), reverse=True) + self._save_to_file("market_cap", ticker, merged) + + def get_market_cap_by_date(self, ticker: str, date: str) -> float | None: + """Get market cap for a specific date from cache.""" + cached_data = self.get_market_cap(ticker) + if not cached_data: + return None + + # Find the entry with matching date + for entry in cached_data: + if entry.get("date") == date: + return entry.get("market_cap") + + return None + + def get_latest_market_cap_date(self, ticker: str) -> str | None: + """Get the latest date in market cap cache.""" + cached_data = self.get_market_cap(ticker) + if not cached_data: + return None + + # Data should be sorted by date descending, so first entry is latest + if cached_data: + return cached_data[0].get("date") + + return None + + def get_cache_stats(self) -> dict: + """Get cache statistics.""" + return self.stats.copy() - def set_company_news(self, ticker: str, data: list[dict[str, any]]): - """Append new company news to cache.""" - self._company_news_cache[ticker] = self._merge_data(self._company_news_cache.get(ticker), data, key_field="date") + def print_cache_stats(self): + """Print cache statistics in a user-friendly format.""" + stats = self.get_cache_stats() + total_hits = stats.get("total_hits", 0) + total_api_calls = stats.get("total_api_calls", 0) + by_type = stats.get("by_type", {}) + api_calls_by_type = stats.get("api_calls_by_type", {}) + + print("\n" + "=" * 70) + print("Cache Statistics") + print("=" * 70) + print(f"Total Cache Hits (API calls avoided): {total_hits:,}") + print(f"Total API Calls: {total_api_calls:,}") + if total_hits + total_api_calls > 0: + cache_hit_rate = (total_hits / (total_hits + total_api_calls)) * 100 + print(f"Cache Hit Rate: {cache_hit_rate:.1f}%") + print("\nBy Type:") + print("-" * 70) + + # Cache type display names + type_names = { + "prices": "Prices", + "financial_metrics": "Financial Metrics", + "line_items": "Line Items", + "insider_trades": "Insider Trades", + "company_news": "Company News", + "market_cap": "Market Cap", + } + + # Print cache hits + print("Cache Hits (API calls avoided):") + for cache_type, count in sorted(by_type.items(), key=lambda x: x[1], reverse=True): + name = type_names.get(cache_type, cache_type) + percentage = (count / total_hits * 100) if total_hits > 0 else 0 + print(f" {name:35s}: {count:6,} ({percentage:5.1f}%)") + + print("\nAPI Calls:") + for cache_type, count in sorted(api_calls_by_type.items(), key=lambda x: x[1], reverse=True): + name = type_names.get(cache_type, cache_type) + percentage = (count / total_api_calls * 100) if total_api_calls > 0 else 0 + print(f" {name:35s}: {count:6,} ({percentage:5.1f}%)") + + print("=" * 70 + "\n") # Global cache instance diff --git a/src/main.py b/src/main.py index 72f61b846..6944723eb 100644 --- a/src/main.py +++ b/src/main.py @@ -15,6 +15,7 @@ from src.cli.input import ( parse_cli_inputs, ) +from src.data.cache import get_cache import argparse from datetime import datetime @@ -177,3 +178,7 @@ def create_workflow(selected_analysts=None): model_provider=inputs.model_provider, ) print_trading_output(result) + + # Display cache statistics + cache = get_cache() + cache.print_cache_stats() diff --git a/src/tools/api.py b/src/tools/api.py index 60ccbdc9b..9480ca4fc 100644 --- a/src/tools/api.py +++ b/src/tools/api.py @@ -64,9 +64,11 @@ def get_prices(ticker: str, start_date: str, end_date: str, api_key: str = None) # Check cache first - simple exact match if cached_data := _cache.get_prices(cache_key): + _cache.record_cache_hit("prices") return [Price(**price) for price in cached_data] # If not in cache, fetch from API + _cache.record_api_call("prices") headers = {} financial_api_key = api_key or os.environ.get("FINANCIAL_DATASETS_API_KEY") if financial_api_key: @@ -97,34 +99,56 @@ def get_financial_metrics( api_key: str = None, ) -> list[FinancialMetrics]: """Fetch financial metrics from cache or API.""" - # Create a cache key that includes all parameters to ensure exact matches - cache_key = f"{ticker}_{period}_{end_date}_{limit}" + today = datetime.datetime.now().strftime("%Y-%m-%d") - # Check cache first - simple exact match - if cached_data := _cache.get_financial_metrics(cache_key): - return [FinancialMetrics(**metric) for metric in cached_data] - - # If not in cache, fetch from API - headers = {} - financial_api_key = api_key or os.environ.get("FINANCIAL_DATASETS_API_KEY") - if financial_api_key: - headers["X-API-KEY"] = financial_api_key + # Check cache first + cached_data = _cache.get_financial_metrics(ticker, period) + cache_key = f"{ticker}_{period}" + # Use last_updated_date (query date) instead of data's latest date + latest_cached_date = _cache.get_last_updated_date("financial_metrics", cache_key) + + # Check if we need to refresh cache + # Refresh if cache doesn't exist or last query date is not today + need_refresh = latest_cached_date is None or latest_cached_date != today + + # If cache exists and doesn't need refresh, record cache hit and use cache + if cached_data and not need_refresh: + _cache.record_cache_hit("financial_metrics") + # If cache needs refresh, fetch data up to today from API + elif need_refresh: + _cache.record_api_call("financial_metrics") + headers = {} + financial_api_key = api_key or os.environ.get("FINANCIAL_DATASETS_API_KEY") + if financial_api_key: + headers["X-API-KEY"] = financial_api_key - url = f"https://api.financialdatasets.ai/financial-metrics/?ticker={ticker}&report_period_lte={end_date}&limit={limit}&period={period}" - response = _make_api_request(url, headers) - if response.status_code != 200: - raise Exception(f"Error fetching data: {ticker} - {response.status_code} - {response.text}") + # Always fetch data up to today (using today as end_date) + url = f"https://api.financialdatasets.ai/financial-metrics/?ticker={ticker}&report_period_lte={today}&limit=100&period={period}" + response = _make_api_request(url, headers) + if response.status_code != 200: + raise Exception(f"Error fetching data: {ticker} - {response.status_code} - {response.text}") - # Parse response with Pydantic model - metrics_response = FinancialMetricsResponse(**response.json()) - financial_metrics = metrics_response.financial_metrics + # Parse response with Pydantic model + metrics_response = FinancialMetricsResponse(**response.json()) + financial_metrics = metrics_response.financial_metrics - if not financial_metrics: + if financial_metrics: + # Cache the results (only ticker and period in cache key) and update last_updated_date + _cache.set_financial_metrics(ticker, period, [m.model_dump() for m in financial_metrics], update_date=today) + # Update cached_data for filtering + cached_data = _cache.get_financial_metrics(ticker, period) + + # Filter cached data based on end_date and limit + if not cached_data: return [] - - # Cache the results as dicts using the comprehensive cache key - _cache.set_financial_metrics(cache_key, [m.model_dump() for m in financial_metrics]) - return financial_metrics + + # Filter by end_date (report_period <= end_date) and limit + filtered_data = [ + metric for metric in cached_data + if metric.get("report_period", "") <= end_date + ][:limit] + + return [FinancialMetrics(**metric) for metric in filtered_data] def search_line_items( @@ -135,33 +159,125 @@ def search_line_items( limit: int = 10, api_key: str = None, ) -> list[LineItem]: - """Fetch line items from API.""" - # If not in cache or insufficient data, fetch from API - headers = {} - financial_api_key = api_key or os.environ.get("FINANCIAL_DATASETS_API_KEY") - if financial_api_key: - headers["X-API-KEY"] = financial_api_key - - url = "https://api.financialdatasets.ai/financials/search/line-items" + """Fetch line items from cache or API.""" + today = datetime.datetime.now().strftime("%Y-%m-%d") + + # Check cache first (only by ticker and period, no end_date/limit/line_items) + cached_data = _cache.get_line_items(ticker, period) + cache_key = f"{ticker}_{period}" + # Use last_updated_date (query date) instead of data's latest date + latest_cached_date = _cache.get_last_updated_date("line_items", cache_key) + + # Check if we need to refresh cache + # Refresh if cache doesn't exist or last query date is not today + need_refresh = latest_cached_date is None or latest_cached_date != today + + # If cache exists and doesn't need refresh, record cache hit and use cache + if cached_data and not need_refresh: + _cache.record_cache_hit("line_items") + # If cache needs refresh, fetch all available line items + elif need_refresh: + _cache.record_api_call("line_items") + headers = {} + financial_api_key = api_key or os.environ.get("FINANCIAL_DATASETS_API_KEY") + if financial_api_key: + headers["X-API-KEY"] = financial_api_key - body = { - "tickers": [ticker], - "line_items": line_items, - "end_date": end_date, - "period": period, - "limit": limit, - } - response = _make_api_request(url, headers, method="POST", json_data=body) - if response.status_code != 200: - raise Exception(f"Error fetching data: {ticker} - {response.status_code} - {response.text}") - data = response.json() - response_model = LineItemResponse(**data) - search_results = response_model.search_results - if not search_results: + url = "https://api.financialdatasets.ai/financials/search/line-items" + + # Common line items list for initial fetch (comprehensive list) + common_line_items = [ + "ebit", + "interest_expense", + "capital_expenditure", + "depreciation_and_amortization", + "outstanding_shares", + "net_income", + "total_debt", + "earnings_per_share", + "revenue", + "book_value_per_share", + "total_assets", + "total_liabilities", + "current_assets", + "current_liabilities", + "dividends_and_other_cash_distributions", + "operating_margin", + "debt_to_equity", + "free_cash_flow", + "gross_margin", + "research_and_development", + "operating_expense", + "operating_income", + "return_on_invested_capital", + "cash_and_equivalents", + "shareholders_equity", + "goodwill_and_intangible_assets", + "issuance_or_purchase_of_equity_shares", + "gross_profit", + "ebitda", + "working_capital", + ] + + # First fetch: get all common line items with large limit and today's date + body = { + "tickers": [ticker], + "line_items": common_line_items, + "end_date": today, + "period": period, + "limit": 1000, # Large limit to get all available periods + } + response = _make_api_request(url, headers, method="POST", json_data=body) + if response.status_code != 200: + # If failed, try with empty line_items to get all available + body["line_items"] = [] + response = _make_api_request(url, headers, method="POST", json_data=body) + if response.status_code != 200: + raise Exception(f"Error fetching data: {ticker} - {response.status_code} - {response.text}") + + data = response.json() + response_model = LineItemResponse(**data) + search_results = response_model.search_results + + if search_results: + # Cache all results (only ticker and period in cache key) and update last_updated_date + _cache.set_line_items(ticker, period, [item.model_dump() for item in search_results], update_date=today) + # Update cached_data for filtering + cached_data = _cache.get_line_items(ticker, period) + + # Filter cached data based on line_items, end_date, and limit + if not cached_data: return [] - - # Cache the results - return search_results[:limit] + + filtered_items = [] + for item in cached_data: + item_report_period = item.get("report_period", "") + + # Filter by end_date (report_period <= end_date) + if item_report_period > end_date: + continue + + # Filter by requested line_items (check if any requested line_item exists in the item) + item_dict = dict(item) + has_requested_line_item = False + for requested_item in line_items: + # Check if the requested line_item exists as a key in the item + if requested_item in item_dict and item_dict[requested_item] is not None: + has_requested_line_item = True + break + + if not has_requested_line_item: + continue + + filtered_items.append(item) + + # Sort by report_period descending (newest first) + filtered_items.sort(key=lambda x: x.get("report_period", ""), reverse=True) + + # Apply limit + filtered_items = filtered_items[:limit] + + return [LineItem(**item) for item in filtered_items] def get_insider_trades( @@ -172,58 +288,93 @@ def get_insider_trades( api_key: str = None, ) -> list[InsiderTrade]: """Fetch insider trades from cache or API.""" - # Create a cache key that includes all parameters to ensure exact matches - cache_key = f"{ticker}_{start_date or 'none'}_{end_date}_{limit}" + today = datetime.datetime.now().strftime("%Y-%m-%d") - # Check cache first - simple exact match - if cached_data := _cache.get_insider_trades(cache_key): - return [InsiderTrade(**trade) for trade in cached_data] - - # If not in cache, fetch from API - headers = {} - financial_api_key = api_key or os.environ.get("FINANCIAL_DATASETS_API_KEY") - if financial_api_key: - headers["X-API-KEY"] = financial_api_key + # Check cache first (only by ticker, no start_date/end_date/limit) + cached_data = _cache.get_insider_trades(ticker) + # Use last_updated_date (query date) instead of data's latest date + latest_cached_date = _cache.get_last_updated_date("insider_trades", ticker) + + # Calculate one year ago date for default fetch + one_year_ago = (datetime.datetime.now() - datetime.timedelta(days=365)).strftime("%Y-%m-%d") + + # Check if we need to refresh cache + # Refresh if cache doesn't exist or last query date is not today + need_refresh = latest_cached_date is None or latest_cached_date != today + + # If cache exists and doesn't need refresh, record cache hit and use cache + if cached_data and not need_refresh: + _cache.record_cache_hit("insider_trades") + # If cache needs refresh, fetch default one year of data + elif need_refresh: + _cache.record_api_call("insider_trades") + headers = {} + financial_api_key = api_key or os.environ.get("FINANCIAL_DATASETS_API_KEY") + if financial_api_key: + headers["X-API-KEY"] = financial_api_key - all_trades = [] - current_end_date = end_date + all_trades = [] + current_end_date = today - while True: - url = f"https://api.financialdatasets.ai/insider-trades/?ticker={ticker}&filing_date_lte={current_end_date}" - if start_date: - url += f"&filing_date_gte={start_date}" - url += f"&limit={limit}" + # Fetch one year of data by default + while True: + url = f"https://api.financialdatasets.ai/insider-trades/?ticker={ticker}&filing_date_lte={current_end_date}&filing_date_gte={one_year_ago}&limit=1000" - response = _make_api_request(url, headers) - if response.status_code != 200: - raise Exception(f"Error fetching data: {ticker} - {response.status_code} - {response.text}") + response = _make_api_request(url, headers) + if response.status_code != 200: + raise Exception(f"Error fetching data: {ticker} - {response.status_code} - {response.text}") - data = response.json() - response_model = InsiderTradeResponse(**data) - insider_trades = response_model.insider_trades + data = response.json() + response_model = InsiderTradeResponse(**data) + insider_trades = response_model.insider_trades - if not insider_trades: - break + if not insider_trades: + break - all_trades.extend(insider_trades) + all_trades.extend(insider_trades) - # Only continue pagination if we have a start_date and got a full page - if not start_date or len(insider_trades) < limit: - break + # Check if we got a full page + if len(insider_trades) < 1000: + break - # Update end_date to the oldest filing date from current batch for next iteration - current_end_date = min(trade.filing_date for trade in insider_trades).split("T")[0] + # Update end_date to the oldest filing date from current batch for next iteration + current_end_date = min(trade.filing_date for trade in insider_trades).split("T")[0] - # If we've reached or passed the start_date, we can stop - if current_end_date <= start_date: - break + # If we've reached or passed the start_date, we can stop + if current_end_date <= one_year_ago: + break - if not all_trades: + if all_trades: + # Cache the results (only ticker in cache key) and update last_updated_date + _cache.set_insider_trades(ticker, [trade.model_dump() for trade in all_trades], update_date=today) + # Update cached_data for filtering + cached_data = _cache.get_insider_trades(ticker) + + # Filter cached data based on start_date, end_date, and limit + if not cached_data: return [] - - # Cache the results using the comprehensive cache key - _cache.set_insider_trades(cache_key, [trade.model_dump() for trade in all_trades]) - return all_trades + + filtered_trades = [] + for trade in cached_data: + filing_date = trade.get("filing_date", "") + # Extract date part if it includes time + trade_date = filing_date.split("T")[0] if "T" in filing_date else filing_date + + # Filter by date range + if trade_date > end_date: + continue + if start_date and trade_date < start_date: + continue + + filtered_trades.append(trade) + + # Sort by filing_date descending (newest first) to match cache order + filtered_trades.sort(key=lambda x: x.get("filing_date", ""), reverse=True) + + # Apply limit + filtered_trades = filtered_trades[:limit] + + return [InsiderTrade(**trade) for trade in filtered_trades] def get_company_news( @@ -234,58 +385,93 @@ def get_company_news( api_key: str = None, ) -> list[CompanyNews]: """Fetch company news from cache or API.""" - # Create a cache key that includes all parameters to ensure exact matches - cache_key = f"{ticker}_{start_date or 'none'}_{end_date}_{limit}" + today = datetime.datetime.now().strftime("%Y-%m-%d") - # Check cache first - simple exact match - if cached_data := _cache.get_company_news(cache_key): - return [CompanyNews(**news) for news in cached_data] - - # If not in cache, fetch from API - headers = {} - financial_api_key = api_key or os.environ.get("FINANCIAL_DATASETS_API_KEY") - if financial_api_key: - headers["X-API-KEY"] = financial_api_key + # Check cache first (only by ticker, no start_date/end_date/limit) + cached_data = _cache.get_company_news(ticker) + # Use last_updated_date (query date) instead of data's latest date + latest_cached_date = _cache.get_last_updated_date("company_news", ticker) + + # Calculate one year ago date for default fetch + one_year_ago = (datetime.datetime.now() - datetime.timedelta(days=365)).strftime("%Y-%m-%d") + + # Check if we need to refresh cache + # Refresh if cache doesn't exist or last query date is not today + need_refresh = latest_cached_date is None or latest_cached_date != today + + # If cache exists and doesn't need refresh, record cache hit and use cache + if cached_data and not need_refresh: + _cache.record_cache_hit("company_news") + # If cache needs refresh, fetch default one year of data + elif need_refresh: + _cache.record_api_call("company_news") + headers = {} + financial_api_key = api_key or os.environ.get("FINANCIAL_DATASETS_API_KEY") + if financial_api_key: + headers["X-API-KEY"] = financial_api_key - all_news = [] - current_end_date = end_date + all_news = [] + current_end_date = today - while True: - url = f"https://api.financialdatasets.ai/news/?ticker={ticker}&end_date={current_end_date}" - if start_date: - url += f"&start_date={start_date}" - url += f"&limit={limit}" + # Fetch one year of data by default + while True: + url = f"https://api.financialdatasets.ai/news/?ticker={ticker}&end_date={current_end_date}&start_date={one_year_ago}&limit=1000" - response = _make_api_request(url, headers) - if response.status_code != 200: - raise Exception(f"Error fetching data: {ticker} - {response.status_code} - {response.text}") + response = _make_api_request(url, headers) + if response.status_code != 200: + raise Exception(f"Error fetching data: {ticker} - {response.status_code} - {response.text}") - data = response.json() - response_model = CompanyNewsResponse(**data) - company_news = response_model.news + data = response.json() + response_model = CompanyNewsResponse(**data) + company_news = response_model.news - if not company_news: - break + if not company_news: + break - all_news.extend(company_news) + all_news.extend(company_news) - # Only continue pagination if we have a start_date and got a full page - if not start_date or len(company_news) < limit: - break + # Check if we got a full page + if len(company_news) < 1000: + break - # Update end_date to the oldest date from current batch for next iteration - current_end_date = min(news.date for news in company_news).split("T")[0] + # Update end_date to the oldest date from current batch for next iteration + current_end_date = min(news.date for news in company_news).split("T")[0] - # If we've reached or passed the start_date, we can stop - if current_end_date <= start_date: - break + # If we've reached or passed the start_date, we can stop + if current_end_date <= one_year_ago: + break - if not all_news: + if all_news: + # Cache the results (only ticker in cache key) and update last_updated_date + _cache.set_company_news(ticker, [news.model_dump() for news in all_news], update_date=today) + # Update cached_data for filtering + cached_data = _cache.get_company_news(ticker) + + # Filter cached data based on start_date, end_date, and limit + if not cached_data: return [] - - # Cache the results using the comprehensive cache key - _cache.set_company_news(cache_key, [news.model_dump() for news in all_news]) - return all_news + + filtered_news = [] + for news in cached_data: + news_date = news.get("date", "") + # Extract date part if it includes time + article_date = news_date.split("T")[0] if "T" in news_date else news_date + + # Filter by date range + if article_date > end_date: + continue + if start_date and article_date < start_date: + continue + + filtered_news.append(news) + + # Sort by date descending (newest first) to match cache order + filtered_news.sort(key=lambda x: x.get("date", ""), reverse=True) + + # Apply limit + filtered_news = filtered_news[:limit] + + return [CompanyNews(**news) for news in filtered_news] def get_market_cap( @@ -293,25 +479,50 @@ def get_market_cap( end_date: str, api_key: str = None, ) -> float | None: - """Fetch market cap from the API.""" - # Check if end_date is today - if end_date == datetime.datetime.now().strftime("%Y-%m-%d"): - # Get the market cap from company facts API - headers = {} - financial_api_key = api_key or os.environ.get("FINANCIAL_DATASETS_API_KEY") - if financial_api_key: - headers["X-API-KEY"] = financial_api_key - - url = f"https://api.financialdatasets.ai/company/facts/?ticker={ticker}" - response = _make_api_request(url, headers) - if response.status_code != 200: - print(f"Error fetching company facts: {ticker} - {response.status_code}") + """Fetch market cap from cache or API.""" + today = datetime.datetime.now().strftime("%Y-%m-%d") + + # Check cache first - try to get market cap for the specific end_date + cached_market_cap = _cache.get_market_cap_by_date(ticker, end_date) + if cached_market_cap is not None: + _cache.record_cache_hit("market_cap") + return cached_market_cap + + # Cache doesn't have data for end_date, check if we need to refresh + latest_cached_date = _cache.get_latest_market_cap_date(ticker) + need_refresh = latest_cached_date is None or latest_cached_date != today + + # If end_date is today, check if we need to refresh cache + if end_date == today: + # If cache doesn't exist or latest date is not today, refresh + if need_refresh: + _cache.record_api_call("market_cap") + headers = {} + financial_api_key = api_key or os.environ.get("FINANCIAL_DATASETS_API_KEY") + if financial_api_key: + headers["X-API-KEY"] = financial_api_key + + url = f"https://api.financialdatasets.ai/company/facts/?ticker={ticker}" # free + response = _make_api_request(url, headers) + if response.status_code != 200: + print(f"Error fetching company facts: {ticker} - {response.status_code}") + return None + + data = response.json() + response_model = CompanyFactsResponse(**data) + market_cap = response_model.company_facts.market_cap + + # Cache the result + if market_cap is not None: + _cache.set_market_cap(ticker, [{"date": today, "market_cap": market_cap}]) + + return market_cap + else: + # Cache exists and latest date is today, but no data for today + # This shouldn't happen, but return None if it does return None - - data = response.json() - response_model = CompanyFactsResponse(**data) - return response_model.company_facts.market_cap - + + # For historical dates, fetch from financial_metrics API financial_metrics = get_financial_metrics(ticker, end_date, api_key=api_key) if not financial_metrics: return None @@ -321,6 +532,9 @@ def get_market_cap( if not market_cap: return None + # Cache the result + _cache.set_market_cap(ticker, [{"date": end_date, "market_cap": market_cap}]) + return market_cap