diff --git a/.gitignore b/.gitignore index cb0f8dc..28da051 100644 --- a/.gitignore +++ b/.gitignore @@ -201,3 +201,7 @@ __marimo__/ # Streamlit .streamlit/secrets.toml + +# Claude +PLAN.md +CLAUDE.md diff --git a/README.md b/README.md index 868bbb5..770dc73 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,62 @@ It-Depends can automatically try to match packages against the [OSV vulnerabilit `--audit` option. This is a best-effort matching as it is based on package names, which might not always consistent. Any discovered vulnerabilities are added to the JSON output. +### Checking Package Maintenance Status + +It-Depends can check the maintenance status of GitHub-hosted packages using the `--check-maintenance` option. This feature +queries the GitHub API to determine when each package was last updated, helping identify stale or unmaintained dependencies +in your software supply chain. + +```shell +# Check maintenance status with default threshold (365 days) +it-depends pip:requests --check-maintenance + +# Use custom staleness threshold (180 days) +it-depends npm:lodash --check-maintenance --stale-threshold 180 + +# Provide GitHub token for higher rate limits +export GITHUB_TOKEN=your_token_here +it-depends pip:requests --check-maintenance + +# Or pass token directly +it-depends pip:requests --check-maintenance --github-token your_token_here +``` + +The maintenance check adds a `maintenance` field to each package in the JSON output: + +```json +{ + "pip:requests": { + "2.31.0": { + "name": "requests", + "version": "2.31.0", + "source": "pip", + "maintenance": { + "repository_url": "https://github.com/psf/requests", + "last_commit_date": "2023-05-22T14:30:00Z", + "is_stale": false, + "days_since_update": 120, + "error": null + } + } + } +} +``` + +**GitHub API Rate Limits:** +- Unauthenticated requests: 60 requests/hour +- Authenticated requests (with token): 5,000 requests/hour +- Maintenance data is cached for 24 hours by default (configurable via `--maintenance-cache-ttl`) + +**Supported Package Managers:** +The maintenance check currently works with packages that have GitHub repositories: +- **npm**: Queries repository field from package.json +- **pip**: Checks project URLs from PyPI metadata +- **cargo**: Uses repository field from Crates.io +- **go**: Extracts from import paths (e.g., github.com/user/repo) + +Packages not hosted on GitHub will have an error message in the maintenance field but will not prevent the analysis from completing. + It-Depends attempts to parallelize as much of its effort as possible. To limit the maximum number of parallel tasks, use the `--max-workers` option. diff --git a/src/it_depends/_cli.py b/src/it_depends/_cli.py index d43e4d1..72ca804 100644 --- a/src/it_depends/_cli.py +++ b/src/it_depends/_cli.py @@ -126,6 +126,17 @@ def main() -> None: # noqa: C901, PLR0912, PLR0915 if settings.audit: package_list = vulnerabilities(package_list) + if settings.check_maintenance: + from .maintenance import check_maintenance_status + + package_list = check_maintenance_status( + package_list, + stale_threshold_days=settings.stale_threshold, + github_token=settings.github_token or os.getenv("GITHUB_TOKEN"), + cache=cache, + cache_ttl=settings.maintenance_cache_ttl, + ) + if to_compare is not None: to_compare_list = resolve( to_compare, diff --git a/src/it_depends/cargo.py b/src/it_depends/cargo.py index e86c3a1..16d7ee1 100644 --- a/src/it_depends/cargo.py +++ b/src/it_depends/cargo.py @@ -10,6 +10,8 @@ from pathlib import Path from typing import TYPE_CHECKING +import requests + if TYPE_CHECKING: from collections.abc import Iterator @@ -184,3 +186,26 @@ def resolve(self, dependency: Dependency) -> Iterator[Package]: cache.set_resolved(dependency) # TODO(@evandowning): propagate up any other info we have in this cache # noqa: TD003, FIX002 return cache.match(dependency) + + @staticmethod + def get_repository_url(package: Package) -> str | None: + """Get GitHub repository URL for Cargo package. + + Args: + package: Package to get repository URL for + + Returns: + Repository URL or None if not found + + """ + try: + response = requests.get( + f"https://crates.io/api/v1/crates/{package.name}", + timeout=5, + ) + if response.status_code == 200: + data = response.json() + return data.get("crate", {}).get("repository") + return None + except requests.RequestException: + return None diff --git a/src/it_depends/config.py b/src/it_depends/config.py index 677b42c..c6a4220 100644 --- a/src/it_depends/config.py +++ b/src/it_depends/config.py @@ -45,6 +45,30 @@ class Settings(BaseSettings): default=False, description="""Audit packages for known vulnerabilities using Google OSV.""", ) + check_maintenance: CliImplicitFlag[bool] = Field( + alias="check-maintenance", + default=False, + description="""Check maintenance status of GitHub-hosted packages. + Queries GitHub API for last commit date and flags stale packages.""", + ) + stale_threshold: int = Field( + alias="stale-threshold", + default=365, + description="""Days since last commit to consider a package stale. + Default: 365 (1 year). Requires --check-maintenance.""", + ) + github_token: str | None = Field( + alias="github-token", + default=None, + description="""GitHub personal access token for API requests. + If not provided, uses GITHUB_TOKEN environment variable. + Authenticated: 5000 requests/hour. Unauthenticated: 60 requests/hour.""", + ) + maintenance_cache_ttl: int = Field( + alias="maintenance-cache-ttl", + default=86400, + description="""Cache TTL for maintenance data in seconds. Default: 86400 (24 hours).""", + ) database: Path = Field( default=DEFAULT_DB_PATH, description="""Alternative path to load/store the database, or diff --git a/src/it_depends/db.py b/src/it_depends/db.py index 224ea9e..27698ed 100644 --- a/src/it_depends/db.py +++ b/src/it_depends/db.py @@ -68,6 +68,19 @@ class Updated(Base): __table_args__ = (UniqueConstraint("package", "version", "source", "resolver", name="updated_unique_constraint"),) +class GitHubMetadataCache(Base): + """Cache for GitHub repository metadata.""" + + __tablename__ = "github_metadata_cache" + + owner = Column(String, nullable=False, primary_key=True) + repo = Column(String, nullable=False, primary_key=True) + pushed_at = Column(String, nullable=True) # ISO 8601 + fetched_at = Column(String, nullable=False) # ISO 8601 + + __table_args__ = (UniqueConstraint("owner", "repo", name="github_cache_unique"),) + + class DBDependency(Base, Dependency): """Database model for dependencies.""" diff --git a/src/it_depends/go.py b/src/it_depends/go.py index 98c0e09..c6e7328 100644 --- a/src/it_depends/go.py +++ b/src/it_depends/go.py @@ -494,3 +494,26 @@ def resolve_from_source(self, repo: SourceRepository, cache: object | None = Non for package, version in module.dependencies ], ) + + @staticmethod + def get_repository_url(package: Package) -> str | None: + """Get GitHub repository URL for Go package. + + For Go packages, the package name often IS the repository path. + For example: github.com/user/repo + + Args: + package: Package to get repository URL for + + Returns: + Repository URL or None if not a GitHub package + + """ + if package.name.startswith("github.com/"): + # Extract owner/repo from path like github.com/owner/repo/subpath + parts = package.name.split("/") + if len(parts) >= 3: + owner = parts[1] + repo = parts[2] + return f"https://github.com/{owner}/{repo}" + return None diff --git a/src/it_depends/maintenance.py b/src/it_depends/maintenance.py new file mode 100644 index 0000000..6d93ffe --- /dev/null +++ b/src/it_depends/maintenance.py @@ -0,0 +1,368 @@ +"""Package maintenance status checking functionality.""" + +from __future__ import annotations + +import logging +import os +import re +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime, timezone +from typing import TYPE_CHECKING + +from requests import Session +from tqdm import tqdm + +if TYPE_CHECKING: + from .cache import DBPackageCache + from .dependencies import Package, PackageRepository + from .models import MaintenanceInfo + +logger = logging.getLogger(__name__) + + +def extract_github_repo(url: str) -> tuple[str, str] | None: + """Extract owner and repo from GitHub URL. + + Args: + url: GitHub repository URL in various formats + + Returns: + Tuple of (owner, repo) or None if not a GitHub URL + + Examples: + >>> extract_github_repo("https://github.com/owner/repo") + ('owner', 'repo') + >>> extract_github_repo("git@github.com:owner/repo.git") + ('owner', 'repo') + >>> extract_github_repo("https://github.com/owner/repo.git") + ('owner', 'repo') + + """ + if not url: + return None + + # Handle various URL formats + patterns = [ + r"github\.com[:/]([^/]+)/([^/\.]+)(?:\.git)?", # HTTPS or SSH + r"github\.com/([^/]+)/([^/]+?)(?:\.git)?$", # With optional .git suffix + ] + + for pattern in patterns: + match = re.search(pattern, url) + if match: + owner, repo = match.groups() + # Remove .git suffix if present + repo = repo.rstrip(".git") + return (owner, repo) + + return None + + +class GitHubClient: + """Client for GitHub API interactions.""" + + API_BASE = "https://api.github.com" + + def __init__(self, token: str | None = None) -> None: + """Initialize GitHub API client. + + Args: + token: GitHub personal access token for authentication + + """ + self.session = Session() + if token: + self.session.headers["Authorization"] = f"token {token}" + self.session.headers["Accept"] = "application/vnd.github.v3+json" + self.remaining_requests: int | None = None + self.reset_time: int | None = None + + def fetch_repo_metadata(self, owner: str, repo: str) -> dict | None: + """Fetch repository metadata from GitHub API. + + Args: + owner: Repository owner + repo: Repository name + + Returns: + Repository metadata dict or None if failed + + """ + try: + response = self.session.get( + f"{self.API_BASE}/repos/{owner}/{repo}", + timeout=10, + ) + + # Track rate limits + self.remaining_requests = int(response.headers.get("X-RateLimit-Remaining", 0)) + self.reset_time = int(response.headers.get("X-RateLimit-Reset", 0)) + + if self.remaining_requests is not None and self.remaining_requests < 10: + logger.warning( + f"GitHub API rate limit low: {self.remaining_requests} requests remaining" + ) + + if response.status_code == 404: + logger.debug(f"Repository not found: {owner}/{repo}") + return None + + if response.status_code == 403: + logger.warning("GitHub API rate limit exceeded") + return None + + response.raise_for_status() + return response.json() + + except Exception as e: + logger.debug(f"GitHub API error for {owner}/{repo}: {e}") + return None + + def extract_maintenance_date(self, metadata: dict) -> str | None: + """Extract last maintenance date from repo metadata. + + Args: + metadata: Repository metadata from GitHub API + + Returns: + ISO 8601 timestamp string or None + + """ + # Use pushed_at (reflects actual code commits) + pushed_at = metadata.get("pushed_at") + if pushed_at: + return pushed_at + + # Fallback to created_at if no pushes + created_at = metadata.get("created_at") + if created_at: + return created_at + + return None + + +def check_maintenance_status( + repo: PackageRepository, + stale_threshold_days: int = 365, + github_token: str | None = None, + cache: DBPackageCache | None = None, + cache_ttl: int = 86400, + nworkers: int | None = None, +) -> PackageRepository: + """Enrich packages with maintenance information. + + Args: + repo: Package repository to enrich + stale_threshold_days: Days threshold for staleness + github_token: GitHub API token + cache: Database cache for GitHub metadata + cache_ttl: Cache TTL in seconds + nworkers: Number of worker threads + + Returns: + Enriched package repository + + """ + from .models import MaintenanceInfo + + github_client = GitHubClient(token=github_token) + + def _check_package_maintenance(pkg: Package) -> tuple[Package, MaintenanceInfo]: + """Check maintenance status for a single package.""" + # Try to get repository URL from package resolver + repo_url = None + try: + if hasattr(pkg.resolver, "get_repository_url"): + repo_url = pkg.resolver.get_repository_url(pkg) + except Exception as e: + logger.debug(f"Failed to get repo URL for {pkg.name}: {e}") + + if not repo_url: + return ( + pkg, + MaintenanceInfo(error="No GitHub repository URL found"), + ) + + # Extract owner/repo from URL + github_info = extract_github_repo(repo_url) + if not github_info: + return ( + pkg, + MaintenanceInfo( + repository_url=repo_url, + error="Repository not hosted on GitHub", + ), + ) + + owner, repo_name = github_info + + # Check cache first (if available) + if cache: + cached = _get_cached_metadata(cache, owner, repo_name, cache_ttl, stale_threshold_days) + if cached: + return (pkg, cached) + + # Fetch from GitHub API + metadata = github_client.fetch_repo_metadata(owner, repo_name) + if not metadata: + return ( + pkg, + MaintenanceInfo( + repository_url=repo_url, + error="Failed to fetch repository metadata", + ), + ) + + # Extract maintenance date + last_commit_str = github_client.extract_maintenance_date(metadata) + if not last_commit_str: + return ( + pkg, + MaintenanceInfo( + repository_url=repo_url, + error="No commit date found", + ), + ) + + # Calculate staleness + try: + last_commit = datetime.fromisoformat(last_commit_str.replace("Z", "+00:00")) + now = datetime.now(timezone.utc) + days_since = (now - last_commit).days + is_stale = days_since > stale_threshold_days + except (ValueError, AttributeError) as e: + logger.debug(f"Failed to parse date for {owner}/{repo_name}: {e}") + return ( + pkg, + MaintenanceInfo( + repository_url=repo_url, + error="Failed to parse commit date", + ), + ) + + maintenance_info = MaintenanceInfo( + repository_url=repo_url, + last_commit_date=last_commit_str, + is_stale=is_stale, + days_since_update=days_since, + ) + + # Cache result + if cache: + _cache_metadata(cache, owner, repo_name, last_commit_str) + + return (pkg, maintenance_info) + + # Process packages in parallel + with ( + ThreadPoolExecutor(max_workers=nworkers) as executor, + tqdm(desc="Checking maintenance status", leave=False, unit=" packages") as t, + ): + futures = {executor.submit(_check_package_maintenance, pkg): pkg for pkg in repo} + t.total = len(futures) + + for future in as_completed(futures): + try: + t.update(1) + pkg, maintenance_info = future.result() + pkg.update_maintenance_info(maintenance_info) + except Exception: + logger.exception("Failed to check maintenance status") + + # Log summary + stale_count = sum(1 for pkg in repo if pkg.maintenance_info and pkg.maintenance_info.is_stale) + if stale_count > 0: + logger.info( + f"Found {stale_count} stale packages (>{stale_threshold_days} days since update)" + ) + + return repo + + +def _get_cached_metadata( + cache: DBPackageCache, + owner: str, + repo: str, + ttl: int, + stale_threshold_days: int, +) -> MaintenanceInfo | None: + """Get cached GitHub metadata if still valid. + + Args: + cache: Database cache + owner: Repository owner + repo: Repository name + ttl: Cache time-to-live in seconds + stale_threshold_days: Threshold for staleness calculation + + Returns: + MaintenanceInfo if cache is valid, None otherwise + + """ + from .db import GitHubMetadataCache + from .models import MaintenanceInfo + + try: + result = ( + cache.session.query(GitHubMetadataCache) + .filter( + GitHubMetadataCache.owner == owner, + GitHubMetadataCache.repo == repo, + ) + .first() + ) + + if not result: + return None + + # Check if cache is still valid + fetched_at = datetime.fromisoformat(result.fetched_at) + if (datetime.now(timezone.utc) - fetched_at).total_seconds() > ttl: + return None + + # Reconstruct MaintenanceInfo from cache + if result.pushed_at: + last_commit = datetime.fromisoformat(result.pushed_at.replace("Z", "+00:00")) + days_since = (datetime.now(timezone.utc) - last_commit).days + return MaintenanceInfo( + repository_url=f"https://github.com/{owner}/{repo}", + last_commit_date=result.pushed_at, + is_stale=days_since > stale_threshold_days, + days_since_update=days_since, + ) + + return None + except Exception as e: + logger.debug(f"Cache lookup failed: {e}") + return None + + +def _cache_metadata( + cache: DBPackageCache, + owner: str, + repo: str, + pushed_at: str, +) -> None: + """Store GitHub metadata in cache. + + Args: + cache: Database cache + owner: Repository owner + repo: Repository name + pushed_at: ISO 8601 timestamp of last push + + """ + from .db import GitHubMetadataCache + + try: + cache.session.merge( + GitHubMetadataCache( + owner=owner, + repo=repo, + pushed_at=pushed_at, + fetched_at=datetime.now(timezone.utc).isoformat(), + ) + ) + cache.session.commit() + except Exception as e: + logger.debug(f"Failed to cache metadata: {e}") diff --git a/src/it_depends/models.py b/src/it_depends/models.py index 1d6e797..21be2dd 100644 --- a/src/it_depends/models.py +++ b/src/it_depends/models.py @@ -55,6 +55,58 @@ def __lt__(self, other: object) -> bool: return self.id < other.id +class MaintenanceInfo: + """Represents maintenance status information for a package.""" + + def __init__( + self, + repository_url: str | None = None, + last_commit_date: str | None = None, + is_stale: bool = False, + days_since_update: int | None = None, + error: str | None = None, + ) -> None: + """Initialize maintenance information. + + Args: + repository_url: GitHub repository URL + last_commit_date: ISO 8601 timestamp of last commit (from pushed_at) + is_stale: Whether package exceeds staleness threshold + days_since_update: Days since last update + error: Error message if check failed + + """ + self.repository_url = repository_url + self.last_commit_date = last_commit_date + self.is_stale = is_stale + self.days_since_update = days_since_update + self.error = error + + def to_obj(self) -> dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + "repository_url": self.repository_url, + "last_commit_date": self.last_commit_date, + "is_stale": self.is_stale, + "days_since_update": self.days_since_update, + "error": self.error, + } + + def __eq__(self, other: object) -> bool: + """Check equality with another MaintenanceInfo.""" + if isinstance(other, MaintenanceInfo): + return ( + self.repository_url == other.repository_url + and self.last_commit_date == other.last_commit_date + and self.is_stale == other.is_stale + ) + return False + + def __hash__(self) -> int: + """Compute hash for MaintenanceInfo.""" + return hash((self.repository_url, self.last_commit_date, self.is_stale)) + + class Dependency: """Represents a dependency with package name, source, and version constraints.""" @@ -221,6 +273,7 @@ def __init__( source: str | DependencyResolver, dependencies: Iterable[Dependency] = (), vulnerabilities: Iterable[Vulnerability] = (), + maintenance_info: MaintenanceInfo | None = None, ) -> None: """Initialize a package. @@ -230,6 +283,7 @@ def __init__( source: Source resolver name or resolver instance dependencies: Package dependencies vulnerabilities: Known vulnerabilities + maintenance_info: Package maintenance information """ if isinstance(version, str): @@ -244,6 +298,7 @@ def __init__( else: self.source = source self.vulnerabilities: frozenset[Vulnerability] = frozenset(vulnerabilities) + self.maintenance_info: MaintenanceInfo | None = maintenance_info @property def full_name(self) -> str: @@ -276,6 +331,19 @@ def update_vulnerabilities(self, vulnerabilities: frozenset[Vulnerability]) -> P self.vulnerabilities = self.vulnerabilities.union(vulnerabilities) return self + def update_maintenance_info(self, info: MaintenanceInfo) -> Package: + """Update package maintenance information. + + Args: + info: Maintenance information to add + + Returns: + Self for method chaining + + """ + self.maintenance_info = info + return self + @property def resolver(self) -> DependencyResolver: """Get the initial main resolver for this package. @@ -339,13 +407,16 @@ def to_dependency(self) -> Dependency: def to_obj(self) -> dict[str, Any]: """Convert package to dictionary representation.""" - return { + obj = { "source": self.source, "name": self.name, "version": str(self.version), "dependencies": {f"{dep.source}:{dep.package}": str(dep.semantic_version) for dep in self.dependencies}, "vulnerabilities": [vuln.to_obj() for vuln in self.vulnerabilities], } + if self.maintenance_info: + obj["maintenance"] = self.maintenance_info.to_obj() + return obj def dumps(self) -> str: """Serialize package to JSON string.""" diff --git a/src/it_depends/npm.py b/src/it_depends/npm.py index babcc36..1f48889 100644 --- a/src/it_depends/npm.py +++ b/src/it_depends/npm.py @@ -216,6 +216,30 @@ def docker_setup(self) -> DockerSetup: baseline_script='#!/usr/bin/env node -e ""\n', ) + @staticmethod + def get_repository_url(package: Package) -> str | None: + """Get GitHub repository URL for NPM package. + + Args: + package: Package to get repository URL for + + Returns: + Repository URL or None if not found + + """ + try: + result = subprocess.check_output( + ["npm", "view", "--json", package.name, "repository"], + timeout=5, + stderr=subprocess.DEVNULL, + ) + repo_info = json.loads(result) + if isinstance(repo_info, dict): + return repo_info.get("url") + return None + except (subprocess.CalledProcessError, json.JSONDecodeError, subprocess.TimeoutExpired): + return None + def generate_dependency_from_information( package_name: str, diff --git a/src/it_depends/pip.py b/src/it_depends/pip.py index 7a0ffe6..03ed7e8 100644 --- a/src/it_depends/pip.py +++ b/src/it_depends/pip.py @@ -10,6 +10,8 @@ from tempfile import TemporaryDirectory from typing import TYPE_CHECKING +import requests + if TYPE_CHECKING: from collections.abc import Iterable, Iterator @@ -236,6 +238,34 @@ def resolve(self, dependency: Dependency) -> Iterator[Package]: log.warning(str(e)) return iter(()) + @staticmethod + def get_repository_url(package: Package) -> str | None: + """Get GitHub repository URL for PyPI package. + + Args: + package: Package to get repository URL for + + Returns: + Repository URL or None if not found + + """ + try: + response = requests.get( + f"https://pypi.org/pypi/{package.name}/json", + timeout=5, + ) + if response.status_code == 200: + data = response.json() + project_urls = data.get("info", {}).get("project_urls", {}) + # Try common keys for repository URLs + for key in ["Source", "Repository", "Homepage", "Code", "source"]: + url = project_urls.get(key) + if url and "github.com" in url: + return url + return None + except requests.RequestException: + return None + class PipSourcePackage(SourcePackage): """Source package for Python packages.""" diff --git a/src/it_depends/sbom.py b/src/it_depends/sbom.py index 5688d66..fd357ed 100644 --- a/src/it_depends/sbom.py +++ b/src/it_depends/sbom.py @@ -6,7 +6,7 @@ from cyclonedx.builder.this import this_component as cdx_lib_component from cyclonedx.model import XsUri from cyclonedx.model.bom import Bom -from cyclonedx.model.component import Component, ComponentType +from cyclonedx.model.component import Component, ComponentType, Property from cyclonedx.model.contact import OrganizationalEntity from cyclonedx.output.json import JsonV1Dot5 @@ -19,6 +19,34 @@ S = TypeVar("S", bound="SBOM") +def _add_maintenance_properties(component: Component, package: Package) -> None: + """Add maintenance information as properties to a CycloneDX component. + + Args: + component: CycloneDX component to add properties to + package: Package with maintenance information + + """ + if package.maintenance_info: + info = package.maintenance_info + if info.repository_url: + component.properties.add( + Property(name="maintenance:repository_url", value=info.repository_url) + ) + if info.last_commit_date: + component.properties.add( + Property(name="maintenance:last_commit_date", value=info.last_commit_date) + ) + if info.days_since_update is not None: + component.properties.add( + Property(name="maintenance:days_since_update", value=str(info.days_since_update)) + ) + if info.is_stale: + component.properties.add(Property(name="maintenance:is_stale", value="true")) + if info.error: + component.properties.add(Property(name="maintenance:error", value=info.error)) + + class SBOM: """Software Bill of Materials representation.""" @@ -55,6 +83,7 @@ def to_cyclonedx(self) -> Bom: version=str(root_package.version), bom_ref=root_package.full_name, ) + _add_maintenance_properties(root_component, root_package) bom.components.add(root_component) expanded[root_package] = root_component @@ -79,7 +108,9 @@ def to_cyclonedx(self) -> Bom: version=str(pkg.version), bom_ref=f"{pkg.full_name}@{pkg.version!s}", ) + _add_maintenance_properties(component, pkg) bom.components.add(component) + expanded[pkg] = component else: component = expanded[pkg] if depends_on not in expanded: @@ -89,7 +120,9 @@ def to_cyclonedx(self) -> Bom: version=str(depends_on.version), bom_ref=f"{depends_on.full_name}@{depends_on.version!s}", ) + _add_maintenance_properties(d_component, depends_on) bom.components.add(d_component) + expanded[depends_on] = d_component else: d_component = expanded[depends_on] bom.register_dependency(component, [d_component]) diff --git a/test/test_maintenance.py b/test/test_maintenance.py new file mode 100644 index 0000000..8a1ef2b --- /dev/null +++ b/test/test_maintenance.py @@ -0,0 +1,389 @@ +"""Unit tests for maintenance checking functionality.""" + +from __future__ import annotations + +from datetime import datetime, timezone +from unittest.mock import MagicMock, Mock, patch + +import pytest + +from it_depends.maintenance import GitHubClient, extract_github_repo +from it_depends.models import MaintenanceInfo, Package + + +class TestGitHubURLExtraction: + """Tests for extracting owner/repo from GitHub URLs.""" + + def test_https_url(self) -> None: + """Test extraction from HTTPS URL.""" + url = "https://github.com/owner/repo" + result = extract_github_repo(url) + assert result == ("owner", "repo") + + def test_https_url_with_git_suffix(self) -> None: + """Test extraction from HTTPS URL with .git suffix.""" + url = "https://github.com/owner/repo.git" + result = extract_github_repo(url) + assert result == ("owner", "repo") + + def test_ssh_url(self) -> None: + """Test extraction from SSH URL.""" + url = "git@github.com:owner/repo.git" + result = extract_github_repo(url) + assert result == ("owner", "repo") + + def test_ssh_url_without_git_suffix(self) -> None: + """Test extraction from SSH URL without .git.""" + url = "git@github.com:owner/repo" + result = extract_github_repo(url) + assert result == ("owner", "repo") + + def test_url_with_subpath(self) -> None: + """Test extraction handles URLs with subpaths.""" + url = "https://github.com/owner/repo/tree/main" + result = extract_github_repo(url) + assert result == ("owner", "repo") + + def test_non_github_url(self) -> None: + """Test non-GitHub URLs return None.""" + url = "https://bitbucket.org/owner/repo" + result = extract_github_repo(url) + assert result is None + + def test_empty_url(self) -> None: + """Test empty URL returns None.""" + result = extract_github_repo("") + assert result is None + + def test_none_url(self) -> None: + """Test None URL returns None.""" + result = extract_github_repo(None) # type: ignore[arg-type] + assert result is None + + def test_malformed_url(self) -> None: + """Test malformed URL returns None.""" + url = "github.com" + result = extract_github_repo(url) + assert result is None + + +class TestMaintenanceInfo: + """Tests for MaintenanceInfo data class.""" + + def test_initialization_with_all_fields(self) -> None: + """Test MaintenanceInfo initialization with all fields.""" + info = MaintenanceInfo( + repository_url="https://github.com/owner/repo", + last_commit_date="2023-05-22T14:30:00Z", + is_stale=True, + days_since_update=500, + error=None, + ) + assert info.repository_url == "https://github.com/owner/repo" + assert info.last_commit_date == "2023-05-22T14:30:00Z" + assert info.is_stale is True + assert info.days_since_update == 500 + assert info.error is None + + def test_initialization_with_error(self) -> None: + """Test MaintenanceInfo initialization with error.""" + info = MaintenanceInfo(error="Failed to fetch repository metadata") + assert info.repository_url is None + assert info.last_commit_date is None + assert info.is_stale is False + assert info.days_since_update is None + assert info.error == "Failed to fetch repository metadata" + + def test_to_obj_serialization(self) -> None: + """Test serialization to dictionary.""" + info = MaintenanceInfo( + repository_url="https://github.com/owner/repo", + last_commit_date="2023-05-22T14:30:00Z", + is_stale=False, + days_since_update=120, + ) + obj = info.to_obj() + assert obj == { + "repository_url": "https://github.com/owner/repo", + "last_commit_date": "2023-05-22T14:30:00Z", + "is_stale": False, + "days_since_update": 120, + "error": None, + } + + def test_equality(self) -> None: + """Test equality comparison.""" + info1 = MaintenanceInfo( + repository_url="https://github.com/owner/repo", + last_commit_date="2023-05-22T14:30:00Z", + is_stale=True, + ) + info2 = MaintenanceInfo( + repository_url="https://github.com/owner/repo", + last_commit_date="2023-05-22T14:30:00Z", + is_stale=True, + ) + assert info1 == info2 + + def test_inequality(self) -> None: + """Test inequality comparison.""" + info1 = MaintenanceInfo( + repository_url="https://github.com/owner/repo1", + last_commit_date="2023-05-22T14:30:00Z", + ) + info2 = MaintenanceInfo( + repository_url="https://github.com/owner/repo2", + last_commit_date="2023-05-22T14:30:00Z", + ) + assert info1 != info2 + + def test_hash(self) -> None: + """Test hashing for set/dict usage.""" + info1 = MaintenanceInfo( + repository_url="https://github.com/owner/repo", + last_commit_date="2023-05-22T14:30:00Z", + is_stale=True, + ) + info2 = MaintenanceInfo( + repository_url="https://github.com/owner/repo", + last_commit_date="2023-05-22T14:30:00Z", + is_stale=True, + ) + # Same info should have same hash + assert hash(info1) == hash(info2) + # Should be usable in sets + info_set = {info1, info2} + assert len(info_set) == 1 + + +class TestGitHubClient: + """Tests for GitHubClient API interactions.""" + + def test_initialization_without_token(self) -> None: + """Test client initialization without token.""" + client = GitHubClient() + assert "Authorization" not in client.session.headers + assert "Accept" in client.session.headers + + def test_initialization_with_token(self) -> None: + """Test client initialization with token.""" + client = GitHubClient(token="test_token") + assert client.session.headers["Authorization"] == "token test_token" + + @patch("it_depends.maintenance.Session") + def test_fetch_repo_metadata_success(self, mock_session_class: Mock) -> None: + """Test successful repository metadata fetch.""" + # Setup mock + mock_response = Mock() + mock_response.status_code = 200 + mock_response.headers = { + "X-RateLimit-Remaining": "100", + "X-RateLimit-Reset": "1234567890", + } + mock_response.json.return_value = { + "name": "repo", + "pushed_at": "2023-05-22T14:30:00Z", + "stargazers_count": 1000, + } + + mock_session = Mock() + mock_session.get.return_value = mock_response + mock_session_class.return_value = mock_session + + client = GitHubClient() + client.session = mock_session + metadata = client.fetch_repo_metadata("owner", "repo") + + assert metadata is not None + assert metadata["name"] == "repo" + assert metadata["pushed_at"] == "2023-05-22T14:30:00Z" + assert client.remaining_requests == 100 + + @patch("it_depends.maintenance.Session") + def test_fetch_repo_metadata_not_found(self, mock_session_class: Mock) -> None: + """Test 404 response for non-existent repository.""" + mock_response = Mock() + mock_response.status_code = 404 + + mock_session = Mock() + mock_session.get.return_value = mock_response + mock_session_class.return_value = mock_session + + client = GitHubClient() + client.session = mock_session + metadata = client.fetch_repo_metadata("owner", "nonexistent") + + assert metadata is None + + @patch("it_depends.maintenance.Session") + def test_fetch_repo_metadata_rate_limit(self, mock_session_class: Mock) -> None: + """Test 403 response for rate limit exceeded.""" + mock_response = Mock() + mock_response.status_code = 403 + + mock_session = Mock() + mock_session.get.return_value = mock_response + mock_session_class.return_value = mock_session + + client = GitHubClient() + client.session = mock_session + metadata = client.fetch_repo_metadata("owner", "repo") + + assert metadata is None + + def test_extract_maintenance_date_with_pushed_at(self) -> None: + """Test extracting maintenance date from pushed_at field.""" + client = GitHubClient() + metadata = { + "pushed_at": "2023-05-22T14:30:00Z", + "created_at": "2020-01-01T00:00:00Z", + } + date = client.extract_maintenance_date(metadata) + assert date == "2023-05-22T14:30:00Z" + + def test_extract_maintenance_date_fallback_to_created_at(self) -> None: + """Test fallback to created_at when pushed_at is missing.""" + client = GitHubClient() + metadata = {"created_at": "2020-01-01T00:00:00Z"} + date = client.extract_maintenance_date(metadata) + assert date == "2020-01-01T00:00:00Z" + + def test_extract_maintenance_date_no_dates(self) -> None: + """Test returns None when no dates available.""" + client = GitHubClient() + metadata = {"name": "repo"} + date = client.extract_maintenance_date(metadata) + assert date is None + + +class TestPackageIntegration: + """Tests for Package class integration with maintenance info.""" + + def test_package_with_maintenance_info(self) -> None: + """Test creating package with maintenance info.""" + info = MaintenanceInfo( + repository_url="https://github.com/owner/repo", + last_commit_date="2023-05-22T14:30:00Z", + is_stale=False, + days_since_update=120, + ) + pkg = Package( + name="test-package", + version="1.0.0", + source="npm", + maintenance_info=info, + ) + assert pkg.maintenance_info == info + + def test_package_without_maintenance_info(self) -> None: + """Test creating package without maintenance info.""" + pkg = Package(name="test-package", version="1.0.0", source="npm") + assert pkg.maintenance_info is None + + def test_update_maintenance_info(self) -> None: + """Test updating package with maintenance info.""" + pkg = Package(name="test-package", version="1.0.0", source="npm") + info = MaintenanceInfo( + repository_url="https://github.com/owner/repo", + is_stale=True, + ) + result = pkg.update_maintenance_info(info) + assert pkg.maintenance_info == info + assert result == pkg # Should return self for chaining + + def test_to_obj_includes_maintenance(self) -> None: + """Test package serialization includes maintenance info.""" + info = MaintenanceInfo( + repository_url="https://github.com/owner/repo", + last_commit_date="2023-05-22T14:30:00Z", + is_stale=True, + days_since_update=500, + ) + pkg = Package( + name="test-package", + version="1.0.0", + source="npm", + maintenance_info=info, + ) + obj = pkg.to_obj() + assert "maintenance" in obj + assert obj["maintenance"]["repository_url"] == "https://github.com/owner/repo" + assert obj["maintenance"]["is_stale"] is True + + def test_to_obj_without_maintenance(self) -> None: + """Test package serialization without maintenance info.""" + pkg = Package(name="test-package", version="1.0.0", source="npm") + obj = pkg.to_obj() + assert "maintenance" not in obj + + +class TestResolverURLExtraction: + """Tests for resolver get_repository_url methods.""" + + @patch("it_depends.npm.subprocess.check_output") + def test_npm_get_repository_url(self, mock_subprocess: Mock) -> None: + """Test NPM resolver repository URL extraction.""" + from it_depends.npm import NPMResolver + + mock_subprocess.return_value = b'{"url": "https://github.com/lodash/lodash"}' + + pkg = Package(name="lodash", version="4.17.21", source="npm") + url = NPMResolver.get_repository_url(pkg) + + assert url == "https://github.com/lodash/lodash" + + @patch("it_depends.pip.requests.get") + def test_pip_get_repository_url(self, mock_get: Mock) -> None: + """Test Pip resolver repository URL extraction.""" + from it_depends.pip import PipResolver + + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "info": { + "project_urls": { + "Source": "https://github.com/psf/requests", + } + } + } + mock_get.return_value = mock_response + + pkg = Package(name="requests", version="2.31.0", source="pip") + url = PipResolver.get_repository_url(pkg) + + assert url == "https://github.com/psf/requests" + + @patch("it_depends.cargo.requests.get") + def test_cargo_get_repository_url(self, mock_get: Mock) -> None: + """Test Cargo resolver repository URL extraction.""" + from it_depends.cargo import CargoResolver + + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "crate": {"repository": "https://github.com/rust-lang/cargo"} + } + mock_get.return_value = mock_response + + pkg = Package(name="cargo", version="0.1.0", source="cargo") + url = CargoResolver.get_repository_url(pkg) + + assert url == "https://github.com/rust-lang/cargo" + + def test_go_get_repository_url(self) -> None: + """Test Go resolver repository URL extraction.""" + from it_depends.go import GoResolver + + # Test with GitHub package + pkg = Package(name="github.com/user/repo", version="1.0.0", source="go") + url = GoResolver.get_repository_url(pkg) + assert url == "https://github.com/user/repo" + + # Test with non-GitHub package + pkg2 = Package(name="golang.org/x/tools", version="1.0.0", source="go") + url2 = GoResolver.get_repository_url(pkg2) + assert url2 is None + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])