diff --git a/README.md b/README.md index 1556a31d0..d01223dfd 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,7 @@ sudo cp target/release/cx /usr/local/bin/ | `cx ask` | Ask questions in natural language, get intelligent responses | | `cx status` | System health check and status overview | | `cx demo` | Interactive demo of CX Linux capabilities | +| `python -m cx.dependency_conflict_predictor` | Pre-install apt/pip conflict prediction with confidence + suggestions | ### Core Capabilities diff --git a/cx/dependency_conflict_predictor.py b/cx/dependency_conflict_predictor.py new file mode 100644 index 000000000..65a00fea6 --- /dev/null +++ b/cx/dependency_conflict_predictor.py @@ -0,0 +1,545 @@ +""" +Copyright (c) 2026 AI Venture Holdings LLC +Licensed under the Business Source License 1.1 +You may not use this file except in compliance with the License. + +Dependency conflict prediction for CX Linux pre-install flows. + +MVP scope: +- Analyze apt/dpkg metadata before install. +- Predict conflicts with confidence scores. +- Rank resolution suggestions by safety. +- Include a pip conflict check path. +- Provide CLI output for operators. +""" + +from __future__ import annotations + +import argparse +import importlib.metadata +import json +import re +import shutil +import subprocess +from dataclasses import asdict, dataclass +from typing import Callable, Dict, List, Optional, Sequence, Tuple + +from packaging.requirements import InvalidRequirement, Requirement +from packaging.specifiers import InvalidSpecifier, SpecifierSet +from packaging.version import InvalidVersion, Version + +APT_MANY_MISSING_DEPS_THRESHOLD = 5 +HIGH_CONFIDENCE_THRESHOLD = 0.85 +_SAFE_SYSTEM_PATH = "/usr/bin:/bin:/usr/sbin:/sbin" +_SAFE_COMMANDS = {"apt-cache", "dpkg-query"} +_APT_PACKAGE_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9+.:_-]*$") +_PIP_SPEC_RE = re.compile( + r"^[A-Za-z0-9_.-]+(?:\s*(?:==|!=|>=|<=|>|<)\s*[A-Za-z0-9_.!+-]+(?:\s*,\s*(?:==|!=|>=|<=|>|<)\s*[A-Za-z0-9_.!+-]+)*)?$" +) + + +@dataclass +class ConflictFinding: + ecosystem: str # apt | pip + package: str + issue: str + confidence: float + evidence: str + + +@dataclass +class ResolutionSuggestion: + action: str + safety_score: float + rationale: str + + +@dataclass +class PredictionResult: + findings: List[ConflictFinding] + suggestions: List[ResolutionSuggestion] + + @property + def overall_confidence(self) -> float: + if not self.findings: + return 0.0 + return round(sum(item.confidence for item in self.findings) / len(self.findings), 3) + + +def _run_cmd(command: Sequence[str]) -> str: + if not command or command[0] not in _SAFE_COMMANDS: + return "" + + executable = shutil.which(command[0], path=_SAFE_SYSTEM_PATH) + if executable is None: + return "" + + try: + result = subprocess.run( + [executable, *command[1:]], + capture_output=True, + text=True, + check=False, + env={"PATH": _SAFE_SYSTEM_PATH}, + cwd="/", + timeout=15, + ) + except FileNotFoundError: + return "" + if result.returncode != 0: + return "" + return result.stdout + + +def _parse_name_from_alt_dep(token: str) -> str: + token = token.strip().split("|")[0].strip() + token = token.split(":")[0] + token = token.split(" ")[0] + return token + + +def _parse_apt_field(line: str, field: str) -> List[str]: + if not line.startswith(field + ":"): + return [] + value = line.split(":", 1)[1].strip() + if not value: + return [] + items: List[str] = [] + for part in value.split(","): + part = part.strip() + name = _parse_name_from_alt_dep(part) + if name: + items.append(name) + return items + + +def _iter_apt_fields(show_output: str, field: str) -> List[str]: + items: List[str] = [] + current_value = "" + + for raw_line in show_output.splitlines(): + if raw_line.startswith(field + ":"): + if current_value: + items.extend(_parse_apt_field(f"{field}: {current_value}", field)) + current_value = raw_line.split(":", 1)[1].strip() + continue + + if current_value and raw_line.startswith(" "): + current_value = f"{current_value} {raw_line.strip()}".strip() + continue + + if current_value: + items.extend(_parse_apt_field(f"{field}: {current_value}", field)) + current_value = "" + + if current_value: + items.extend(_parse_apt_field(f"{field}: {current_value}", field)) + + return items + + +def get_installed_dpkg_packages(run_cmd: Callable[[Sequence[str]], str] = _run_cmd) -> Dict[str, str]: + output = run_cmd(["dpkg-query", "-W", "-f=${Package}\t${Version}\n"]) + installed: Dict[str, str] = {} + for row in output.splitlines(): + cols = row.strip().split("\t") + if len(cols) == 2: + installed[cols[0]] = cols[1] + return installed + + +def inspect_apt_package( + package: str, + installed: Dict[str, str], + run_cmd: Callable[[Sequence[str]], str] = _run_cmd, +) -> List[ConflictFinding]: + findings: List[ConflictFinding] = [] + + show_output = run_cmd(["apt-cache", "show", package]) + if not show_output: + findings.append( + ConflictFinding( + ecosystem="apt", + package=package, + issue="package-not-found", + confidence=0.8, + evidence=f"apt-cache show {package} returned no metadata", + ) + ) + return findings + + conflicts = _iter_apt_fields(show_output, "Conflicts") + breaks = _iter_apt_fields(show_output, "Breaks") + depends = _iter_apt_fields(show_output, "Depends") + + for target in conflicts: + if target in installed: + findings.append( + ConflictFinding( + ecosystem="apt", + package=package, + issue="conflicts-installed-package", + confidence=0.95, + evidence=f"{package} Conflicts with installed package {target}", + ) + ) + + for target in breaks: + if target in installed: + findings.append( + ConflictFinding( + ecosystem="apt", + package=package, + issue="breaks-installed-package", + confidence=0.9, + evidence=f"{package} Breaks installed package {target}", + ) + ) + + missing = [dep for dep in depends if dep and dep not in installed] + if len(missing) >= APT_MANY_MISSING_DEPS_THRESHOLD: + findings.append( + ConflictFinding( + ecosystem="apt", + package=package, + issue="many-missing-dependencies", + confidence=0.55, + evidence=f"{package} has {len(missing)} dependencies not currently installed", + ) + ) + + return findings + + +def _parse_req_name_and_constraints(spec: str) -> Tuple[str, str]: + cleaned = spec.strip() + if not cleaned: + return "", "" + + try: + requirement = Requirement(cleaned) + return requirement.name.lower(), str(requirement.specifier) + except InvalidRequirement: + name_end = 0 + while name_end < len(cleaned) and cleaned[name_end] in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_.-": + name_end += 1 + + if name_end == 0: + return cleaned.lower(), "" + + return cleaned[:name_end].lower(), cleaned[name_end:].strip() + + +def _parse_version(version: str) -> Version: + return Version(version) + + +def _version_satisfies_constraint(version: str, constraint: str) -> bool: + if not constraint: + return True + + try: + return _parse_version(version) in SpecifierSet(constraint) + except (InvalidSpecifier, InvalidVersion): + return False + + +def _find_unsupported_constraint_clauses(constraint: str) -> List[str]: + unsupported: List[str] = [] + for clause in [c.strip() for c in constraint.split(",") if c.strip()]: + try: + SpecifierSet(clause) + except InvalidSpecifier: + unsupported.append(clause) + return unsupported + + +def _build_requested_constraint_map(requested: Sequence[Tuple[str, str]]) -> Dict[str, str]: + return dict(requested) + + +def _extract_constraint_candidate_versions(*constraints: str) -> List[Version]: + candidates: Dict[str, Version] = {} + + for raw in ("0", "0.1", "1", "1.0", "2", "10"): + candidates[raw] = Version(raw) + + for constraint in constraints: + for raw in re.findall(r"[0-9][A-Za-z0-9_.!+-]*", constraint): + try: + version = Version(raw) + except InvalidVersion: + continue + + candidates[str(version)] = version + + release = list(version.release or (0,)) + if not release: + continue + + previous_release = release[:] + previous_release[-1] = max(0, previous_release[-1] - 1) + next_release = release[:] + next_release[-1] += 1 + patch_release = [*release, 0] + + for parts in (previous_release, next_release, patch_release): + derived = ".".join(str(part) for part in parts) + candidates[derived] = Version(derived) + + return list(candidates.values()) + + +def _constraints_overlap(left: str, right: str) -> bool: + if not left or not right: + return True + + try: + left_spec = SpecifierSet(left) + right_spec = SpecifierSet(right) + except InvalidSpecifier: + return False + + for candidate in _extract_constraint_candidate_versions(left, right): + if candidate in left_spec and candidate in right_spec: + return True + return False + + +def _inspect_requested_pip_constraints( + requested: Sequence[Tuple[str, str]], + installed: Dict[str, str], +) -> List[ConflictFinding]: + findings: List[ConflictFinding] = [] + + for name, constraint in requested: + for clause in _find_unsupported_constraint_clauses(constraint): + findings.append( + ConflictFinding( + ecosystem="pip", + package=name, + issue="unsupported-constraint", + confidence=0.7, + evidence=f"Unsupported version clause '{clause}' in requested constraint '{constraint}'", + ) + ) + + installed_version = installed.get(name) + if installed_version and constraint and not _version_satisfies_constraint(installed_version, constraint): + findings.append( + ConflictFinding( + ecosystem="pip", + package=name, + issue="installed-version-violates-requested-constraint", + confidence=0.85, + evidence=f"Installed {name}=={installed_version} does not satisfy {constraint}", + ) + ) + + return findings + + +def _inspect_reverse_dependency_risks( + distributions: Sequence[importlib.metadata.Distribution], + requested_constraints: Dict[str, str], +) -> List[ConflictFinding]: + findings: List[ConflictFinding] = [] + + for dist in distributions: + parent = (dist.metadata.get("Name") or "").lower() + if not parent: + continue + + for requirement in dist.requires or []: + dep_name, dep_constraint = _parse_req_name_and_constraints(requirement) + requested_constraint = requested_constraints.get(dep_name) + if requested_constraint and dep_constraint and not _constraints_overlap(dep_constraint, requested_constraint): + findings.append( + ConflictFinding( + ecosystem="pip", + package=dep_name, + issue="reverse-dependency-constraint-risk", + confidence=0.65, + evidence=f"{parent} requires '{requirement}', requested '{dep_name}{requested_constraint}'", + ) + ) + + return findings + + +def inspect_pip_requirements(requested_specs: List[str]) -> List[ConflictFinding]: + findings: List[ConflictFinding] = [] + if not requested_specs: + return findings + + distributions = list(importlib.metadata.distributions()) + installed: Dict[str, str] = {} + for dist in distributions: + name = (dist.metadata.get("Name") or "").lower() + if name: + installed[name] = dist.version + + requested = [_parse_req_name_and_constraints(spec) for spec in requested_specs] + findings.extend(_inspect_requested_pip_constraints(requested, installed)) + findings.extend(_inspect_reverse_dependency_risks(distributions, _build_requested_constraint_map(requested))) + return findings + + +def rank_suggestions(findings: List[ConflictFinding]) -> List[ResolutionSuggestion]: + if not findings: + return [ + ResolutionSuggestion( + action="Proceed with install", + safety_score=0.97, + rationale="No obvious package conflicts detected in current metadata scan.", + ) + ] + + return [ + ResolutionSuggestion( + action="Dry-run apt transaction (apt-get -s install ...) before applying", + safety_score=0.96, + rationale="Simulation validates solver decisions without changing system state.", + ), + ResolutionSuggestion( + action="Prefer non-destructive version pinning over package removal", + safety_score=0.9, + rationale="Pinning lowers breakage risk while preserving currently working packages.", + ), + ResolutionSuggestion( + action="Isolate risky pip installs in virtualenv", + safety_score=0.88, + rationale="Environment isolation prevents global dependency contamination.", + ), + ResolutionSuggestion( + action="Remove conflicting packages only after explicit review", + safety_score=0.52, + rationale="Package removal can cascade into service disruption.", + ), + ] + + +def predict_conflicts( + apt_packages: List[str], + pip_requirements: Optional[List[str]] = None, + run_cmd: Callable[[Sequence[str]], str] = _run_cmd, +) -> PredictionResult: + installed = get_installed_dpkg_packages(run_cmd=run_cmd) + findings: List[ConflictFinding] = [] + + for package in apt_packages: + findings.extend(inspect_apt_package(package, installed=installed, run_cmd=run_cmd)) + + findings.extend(inspect_pip_requirements(pip_requirements or [])) + + suggestions = rank_suggestions(findings) + return PredictionResult(findings=findings, suggestions=suggestions) + + +def render_cli(result: PredictionResult) -> None: + try: + from rich.console import Console + from rich.table import Table + except ImportError: + print(f"Conflict prediction confidence: {result.overall_confidence:.3f}") + print("\nPredicted Conflicts") + print("-" * 80) + + if not result.findings: + print("none | No high-risk conflicts found") + else: + for finding in result.findings: + print( + f"{finding.ecosystem} | {finding.package} | {finding.issue} | " + f"{finding.confidence:.3f} | {finding.evidence}" + ) + + print("\nResolution Suggestions (ranked by safety)") + print("-" * 80) + for suggestion in result.suggestions: + print(f"{suggestion.safety_score:.2f} | {suggestion.action} | {suggestion.rationale}") + return + + console = Console() + console.print(f"Conflict prediction confidence: {result.overall_confidence:.3f}") + + findings_table = Table(title="Predicted Conflicts") + findings_table.add_column("Ecosystem") + findings_table.add_column("Package") + findings_table.add_column("Issue") + findings_table.add_column("Confidence", justify="right") + findings_table.add_column("Evidence") + + if result.findings: + for finding in result.findings: + findings_table.add_row( + finding.ecosystem, + finding.package, + finding.issue, + f"{finding.confidence:.3f}", + finding.evidence, + ) + else: + findings_table.add_row("none", "-", "-", "0.000", "No high-risk conflicts found") + + suggestions_table = Table(title="Resolution Suggestions") + suggestions_table.add_column("Safety", justify="right") + suggestions_table.add_column("Action") + suggestions_table.add_column("Rationale") + + for suggestion in result.suggestions: + suggestions_table.add_row( + f"{suggestion.safety_score:.2f}", + suggestion.action, + suggestion.rationale, + ) + + console.print(findings_table) + console.print(suggestions_table) + + +def _is_valid_pip_spec(spec: str) -> bool: + if not _PIP_SPEC_RE.fullmatch(spec): + return False + try: + Requirement(spec) + except InvalidRequirement: + return False + return True + + +def main() -> int: + parser = argparse.ArgumentParser(description="CX pre-install dependency conflict predictor") + parser.add_argument("--apt", nargs="*", default=[], help="apt packages planned for install") + parser.add_argument("--pip", nargs="*", default=[], help="pip requirement specs planned for install") + parser.add_argument("--json", action="store_true", help="Emit JSON output") + + args = parser.parse_args() + + invalid_apt = [package for package in args.apt if not _APT_PACKAGE_RE.fullmatch(package)] + if invalid_apt: + parser.error(f"Invalid apt package name(s): {', '.join(repr(package) for package in invalid_apt)}") + + invalid_pip = [spec for spec in args.pip if not _is_valid_pip_spec(spec)] + if invalid_pip: + parser.error(f"Invalid pip requirement spec(s): {', '.join(repr(spec) for spec in invalid_pip)}") + + result = predict_conflicts(apt_packages=args.apt, pip_requirements=args.pip) + + if args.json: + payload = { + "overall_confidence": result.overall_confidence, + "findings": [asdict(f) for f in result.findings], + "suggestions": [asdict(s) for s in result.suggestions], + } + print(json.dumps(payload, indent=2)) + else: + render_cli(result) + + if any(f.confidence >= HIGH_CONFIDENCE_THRESHOLD for f in result.findings): + return 2 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/cx/requirements.txt b/cx/requirements.txt index 15eba01d7..a8dbc5a5d 100644 --- a/cx/requirements.txt +++ b/cx/requirements.txt @@ -4,6 +4,7 @@ # Core dependencies rich>=13.0.0 # Terminal UI and console formatting psutil>=5.8.0 # System and process monitoring +packaging>=24.0 # PEP 440 version and specifier parsing # Enterprise security dependencies cryptography>=41.0.0 # Data encryption for sensitive fields (email addresses) diff --git a/docs/dependency-conflict-predictor.md b/docs/dependency-conflict-predictor.md new file mode 100644 index 000000000..486e009e1 --- /dev/null +++ b/docs/dependency-conflict-predictor.md @@ -0,0 +1,37 @@ +# Dependency Conflict Predictor (MVP) + +CX Linux includes an MVP pre-install dependency conflict predictor for package operations. + +## What it checks + +- **apt/dpkg path** + - Reads installed package set from `dpkg-query` + - Reads package metadata from `apt-cache show` + - Flags likely `Conflicts`/`Breaks` against currently installed packages + - Emits a confidence score per finding + +- **pip path** + - Parses requested pip constraints (e.g. `urllib3==2.0.0`) + - Compares against current Python environment metadata + - Flags direct constraint mismatches and reverse dependency risks + +- **Resolution suggestions** + - Ranked by safety (higher first) + - Includes dry-run and isolation-first guidance + +## CLI usage + +```bash +python -m cx.dependency_conflict_predictor --apt nginx postgresql --pip "urllib3==2.1.0" +``` + +JSON output: + +```bash +python -m cx.dependency_conflict_predictor --apt nginx --json +``` + +Exit code behavior: + +- `0`: no high-confidence conflict detected +- `2`: at least one high-confidence conflict found (`confidence >= 0.85`) diff --git a/tests/test_dependency_conflict_predictor.py b/tests/test_dependency_conflict_predictor.py new file mode 100644 index 000000000..24d09dfb1 --- /dev/null +++ b/tests/test_dependency_conflict_predictor.py @@ -0,0 +1,202 @@ +""" +Copyright (c) 2026 AI Venture Holdings LLC +Licensed under the Business Source License 1.1 +You may not use this file except in compliance with the License. + +Regression tests for the dependency conflict predictor. +""" + +import unittest +from unittest.mock import patch + +from cx.dependency_conflict_predictor import ( + ConflictFinding, + _parse_req_name_and_constraints, + _run_cmd, + _version_satisfies_constraint, + inspect_apt_package, + inspect_pip_requirements, + predict_conflicts, + rank_suggestions, +) + + +class FakeDist: + def __init__(self, name, version, requires=None): + self.metadata = {"Name": name} + self.version = version + self.requires = requires or [] + + +class TestDependencyConflictPredictor(unittest.TestCase): + def test_version_constraint_helper(self): + self.assertTrue(_version_satisfies_constraint("1.2.3", ">=1.0,<2.0")) + self.assertFalse(_version_satisfies_constraint("2.1.0", ">=1.0,<2.0")) + self.assertTrue(_version_satisfies_constraint("1.2.3", "==1.2.3")) + self.assertTrue(_version_satisfies_constraint("1.0a1", "==1.0a1")) + self.assertTrue(_version_satisfies_constraint("1.2.3", "")) + self.assertTrue(_version_satisfies_constraint("1.2.3", "!=2.0.0")) + self.assertFalse(_version_satisfies_constraint("1.2.3", ">>2.0.0")) + + def test_parse_req_name_and_constraints_avoids_regex_fallback(self): + self.assertEqual(_parse_req_name_and_constraints("urllib3=>1.26"), ("urllib3", "=>1.26")) + self.assertEqual(_parse_req_name_and_constraints(" requests ~=2.31"), ("requests", "~=2.31")) + self.assertEqual(_parse_req_name_and_constraints(">=1.0"), (">=1.0", "")) + + def test_apt_conflict_detection(self): + installed = {"libssl1.1": "1.1.1", "bash": "5.2"} + + def fake_run(cmd): + if cmd[:2] == ["apt-cache", "show"]: + return """Package: demo-app +Depends: libc6, python3 +Conflicts: libssl1.1 +Breaks: old-demo +""" + return "" + + findings = inspect_apt_package("demo-app", installed=installed, run_cmd=fake_run) + issues = {f.issue for f in findings} + self.assertIn("conflicts-installed-package", issues) + + def test_apt_conflict_detection_with_continuation_lines(self): + installed = {"libssl1.1": "1.1.1", "legacy-lib": "2.0", "bash": "5.2"} + + def fake_run(cmd): + if cmd[:2] == ["apt-cache", "show"]: + return """Package: demo-app +Depends: libc6, + python3, + bash +Conflicts: old-lib, + libssl1.1 +Breaks: unused-lib, + legacy-lib +""" + return "" + + findings = inspect_apt_package("demo-app", installed=installed, run_cmd=fake_run) + issues = {f.issue for f in findings} + evidence = "\n".join(f.evidence for f in findings) + + self.assertIn("conflicts-installed-package", issues) + self.assertIn("breaks-installed-package", issues) + self.assertIn("libssl1.1", evidence) + self.assertIn("legacy-lib", evidence) + + def test_apt_package_not_found(self): + findings = inspect_apt_package( + "missing-app", + installed={}, + run_cmd=lambda _cmd: "", + ) + self.assertEqual(len(findings), 1) + self.assertEqual(findings[0].issue, "package-not-found") + + @patch("importlib.metadata.distributions") + def test_pip_conflict_detection(self, mock_distributions): + mock_distributions.return_value = [ + FakeDist("requests", "2.28.0", ["urllib3>=1.25,<1.27"]), + FakeDist("urllib3", "1.26.6", []), + ] + + findings = inspect_pip_requirements(["urllib3==2.1.0"]) + issues = {f.issue for f in findings} + self.assertIn("installed-version-violates-requested-constraint", issues) + + @patch("importlib.metadata.distributions") + def test_pip_reports_unsupported_constraints(self, mock_distributions): + mock_distributions.return_value = [FakeDist("urllib3", "1.26.6", [])] + + findings = inspect_pip_requirements(["urllib3=>1.26"]) + issues = {f.issue for f in findings} + evidence = "\n".join(f.evidence for f in findings) + + self.assertIn("unsupported-constraint", issues) + self.assertIn("=>1.26", evidence) + + @patch("importlib.metadata.distributions") + def test_predict_conflicts_includes_suggestions(self, mock_distributions): + mock_distributions.return_value = [FakeDist("urllib3", "1.26.6", [])] + + def fake_run(cmd): + if cmd[0] == "dpkg-query": + return "bash\t5.2\n" + if cmd[:2] == ["apt-cache", "show"]: + return "Package: demo\nDepends: bash\n" + return "" + + result = predict_conflicts( + apt_packages=["demo"], + pip_requirements=["urllib3==2.0.0"], + run_cmd=fake_run, + ) + self.assertGreaterEqual(len(result.suggestions), 1) + self.assertGreaterEqual(result.overall_confidence, 0.5) + self.assertTrue(all(f.issue != "unsupported-constraint" for f in result.findings)) + + @patch("importlib.metadata.distributions") + def test_pip_reports_multiple_reverse_dependency_risks_per_distribution(self, mock_distributions): + mock_distributions.return_value = [ + FakeDist( + "service-a", + "1.0.0", + ["urllib3<2.0", "requests<2.30"], + ), + FakeDist("urllib3", "1.26.6", []), + FakeDist("requests", "2.28.0", []), + ] + + findings = inspect_pip_requirements(["urllib3==2.1.0", "requests==2.31.0"]) + reverse_dependency_findings = [ + finding for finding in findings if finding.issue == "reverse-dependency-constraint-risk" + ] + + self.assertEqual(len(reverse_dependency_findings), 2) + self.assertEqual({finding.package for finding in reverse_dependency_findings}, {"urllib3", "requests"}) + + def test_rank_suggestions_preserves_descending_safety_order(self): + findings = [ + ConflictFinding( + ecosystem="apt", + package="demo-app", + issue="conflicts-installed-package", + confidence=0.95, + evidence="demo-app conflicts with installed package libssl1.1", + ) + ] + suggestions = rank_suggestions(findings) + self.assertGreater(len(suggestions), 1) + self.assertEqual( + [suggestion.safety_score for suggestion in suggestions], + sorted((suggestion.safety_score for suggestion in suggestions), reverse=True), + ) + + @patch("cx.dependency_conflict_predictor.subprocess.run") + @patch("cx.dependency_conflict_predictor.shutil.which", return_value="/usr/bin/apt-cache") + def test_run_cmd_uses_resolved_safe_binary(self, mock_which, mock_run): + mock_run.return_value.returncode = 0 + mock_run.return_value.stdout = "Package: demo\n" + + output = _run_cmd(["apt-cache", "show", "demo-app"]) + + self.assertEqual(output, "Package: demo\n") + mock_which.assert_called_once_with("apt-cache", path="/usr/bin:/bin:/usr/sbin:/sbin") + mock_run.assert_called_once_with( + ["/usr/bin/apt-cache", "show", "demo-app"], + capture_output=True, + text=True, + check=False, + env={"PATH": "/usr/bin:/bin:/usr/sbin:/sbin"}, + cwd="/", + timeout=15, + ) + + @patch("cx.dependency_conflict_predictor.subprocess.run") + def test_run_cmd_rejects_non_allowlisted_binary(self, mock_run): + self.assertEqual(_run_cmd(["python3", "-c", "print(1)"]), "") + mock_run.assert_not_called() + + +if __name__ == "__main__": + unittest.main()