diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b840addb..dfac6064f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Non-skill integrators (agent, instruction, prompt, command, hook script-copy) silently adopt byte-identical pre-existing files so a degraded `deployed_files=[]` lockfile no longer permanently blocks installs gated by `required-packages-deployed`. (#1313) - `apm audit` drift check now returns skip-with-info (`passed=True`) when the install cache is cold, instead of failing the audit; bare `apm audit` surfaces the skip reason on stderr so CI pipelines that have not yet run `apm install` are not incorrectly red-marked. (#1289) - `extends: org` now correctly layers `dependencies.require` and `dependencies.deny` from the parent policy when the child omits the `dependencies:` block entirely; `None` signals "no opinion" (transparent) while `[]` signals explicit override. (#1290) - CI self-check job now uses `setup-only: true` + `apm audit --ci --no-drift` so managed files are not overwritten by `apm install` before `content-integrity` runs; documented the audit-only CI pattern and the install-before-audit blind spot in the enterprise and CI/CD guides. (#1291) diff --git a/src/apm_cli/install/services.py b/src/apm_cli/install/services.py index d577d992e..9c36027f4 100644 --- a/src/apm_cli/install/services.py +++ b/src/apm_cli/install/services.py @@ -228,7 +228,7 @@ def _format_target_collapse(paths: list[str], verbose: bool) -> tuple[str, list[ # Aggregate per-primitive across targets so we emit ONE line per kind # (per the 1/2/3+ collapse rule), not one per target. - # Structure: { prim_name: {"files": int, "label": str, "paths": [str]} } + # Structure: { prim_name: {"files": int, "adopted": int, "label": str, "paths": [str]} } _per_kind: dict[str, dict[str, Any]] = {} for _prim_name, _entry in _dispatch.items(): @@ -236,6 +236,7 @@ def _format_target_collapse(paths: list[str], verbose: bool) -> tuple[str, list[ continue # skills handled separately _integrator = _INTEGRATOR_KWARGS[_prim_name] _agg_files = 0 + _agg_adopted = 0 _agg_paths: list[str] = [] _label = _prim_name for _target in targets: @@ -253,9 +254,28 @@ def _format_target_collapse(paths: list[str], verbose: bool) -> tuple[str, list[ result["links_resolved"] += _int_result.links_resolved for tp in _int_result.target_paths: deployed.append(_deployed_path_entry(tp, project_root, targets)) - if _int_result.files_integrated <= 0: + _adopted_attr = getattr(_int_result, "files_adopted", 0) + # Coerce defensively: subclasses (e.g. HookIntegrationResult) + # always set this, but tests use MagicMock results which + # auto-attribute to MagicMock objects whose ``__int__`` is 1. + # Treat anything that is not a real int as 0 so we never + # invent fake adopt counts. + _adopted = _adopted_attr if isinstance(_adopted_attr, int) else 0 + # Show the per-kind line whenever ANY work happened -- either + # a fresh integrate or a silent adopt of pre-existing + # byte-identical files. Adopt-only runs (e.g. re-install + # after lockfile wipe) used to print nothing here, which made + # the install summary look like a no-op even though the + # lockfile WAS being repopulated. Surfacing adopt counts + # restores operator trust in CI. + if _int_result.files_integrated <= 0 and _adopted <= 0: continue _agg_files += _int_result.files_integrated + _agg_adopted += _adopted + # Only count fresh integrations against the package counter + # so totals like "3 prompts integrated" stay truthful; + # adopted files are surfaced separately in the per-kind + # line. result[_entry.counter_key] += _int_result.files_integrated _effective_root = _mapping.deploy_root or _target.root_dir _deploy_dir = ( @@ -278,9 +298,10 @@ def _format_target_collapse(paths: list[str], verbose: bool) -> tuple[str, list[ _label = _prim_name _agg_paths.append(_deploy_dir) - if _agg_files > 0: + if _agg_files > 0 or _agg_adopted > 0: _per_kind[_prim_name] = { "files": _agg_files, + "adopted": _agg_adopted, "label": _label, "paths": _agg_paths, } @@ -291,12 +312,24 @@ def _format_target_collapse(paths: list[str], verbose: bool) -> tuple[str, list[ continue _info = _per_kind[_prim_name] _suffix, _expansion = _format_target_collapse(_info["paths"], _verbose) + # Build the verb + count phrase. When at least one file was + # freshly integrated we lead with "N X integrated"; pure-adopt + # runs (no fresh writes) lead with "N X adopted" so the line + # still appears and the count is truthful. + _files = _info["files"] + _adopted = _info["adopted"] + if _files > 0: + _verb_phrase = f"{_files} {_info['label']} integrated" + if _adopted > 0: + _verb_phrase = f"{_verb_phrase} ({_adopted} adopted)" + else: + _verb_phrase = f"{_adopted} {_info['label']} adopted" if _expansion: - _log_integration(f" |-- {_info['files']} {_info['label']} integrated:") + _log_integration(f" |-- {_verb_phrase}:") for line in _expansion: _log_integration(line) else: - _log_integration(f" |-- {_info['files']} {_info['label']} integrated -> {_suffix}") + _log_integration(f" |-- {_verb_phrase} -> {_suffix}") skill_result = skill_integrator.integrate_package_skill( package_info, diff --git a/src/apm_cli/integration/agent_integrator.py b/src/apm_cli/integration/agent_integrator.py index 5327d95a0..2ff4b9f18 100644 --- a/src/apm_cli/integration/agent_integrator.py +++ b/src/apm_cli/integration/agent_integrator.py @@ -14,6 +14,7 @@ import yaml from apm_cli.integration.base_integrator import BaseIntegrator, IntegrationResult +from apm_cli.utils.path_security import PathTraversalError, ensure_path_within from apm_cli.utils.paths import portable_relpath if TYPE_CHECKING: @@ -119,6 +120,7 @@ def integrate_agents_for_target( files_integrated = 0 files_skipped = 0 + files_adopted = 0 target_paths: list[Path] = [] total_links_resolved = 0 @@ -129,8 +131,36 @@ def integrate_agents_for_target( target, ) target_path = agents_dir / target_filename + # Defense-in-depth: target_filename comes from + # get_target_filename_for_target which strips path separators, + # but assert containment under agents_dir so a future + # regression cannot smuggle a traversal sequence past the + # adopt branch (which fires *before* check_collision and + # would otherwise blindly trust the computed path). Mirrors + # the guard already in command_integrator and + # instruction_integrator. + try: + ensure_path_within(target_path, agents_dir) + except PathTraversalError as exc: + if diagnostics is not None: + diagnostics.warn( + message=f"Rejected agent target path: {exc}", + package=package_info.package.name, + ) + files_skipped += 1 + continue + rel_path = portable_relpath(target_path, project_root) + if self.is_content_identical_to_source(target_path, source_file): + # Pre-existing file is byte-identical to source -- silently + # adopt so deployed_files reflects reality. See + # BaseIntegrator.is_content_identical_to_source for the + # full rationale (catch-22 fix). + target_paths.append(target_path) + files_adopted += 1 + continue + if self.check_collision( target_path, rel_path, @@ -160,6 +190,7 @@ def integrate_agents_for_target( files_skipped=files_skipped, target_paths=target_paths, links_resolved=total_links_resolved, + files_adopted=files_adopted, ) def sync_for_target( @@ -383,6 +414,7 @@ def integrate_package_agents( files_integrated = 0 files_skipped = 0 + files_adopted = 0 target_paths: list[Path] = [] total_links_resolved = 0 @@ -393,18 +425,31 @@ def integrate_package_agents( copilot, ) target_path = agents_dir / target_filename - rel_path = portable_relpath(target_path, project_root) - - if self.check_collision( - target_path, rel_path, managed_files, force, diagnostics=diagnostics - ): + try: + ensure_path_within(target_path, agents_dir) + except PathTraversalError as exc: + if diagnostics is not None: + diagnostics.warn( + message=f"Rejected agent target path: {exc}", + package=package_info.package.name, + ) files_skipped += 1 continue + rel_path = portable_relpath(target_path, project_root) - links_resolved = self.copy_agent(source_file, target_path) - total_links_resolved += links_resolved - files_integrated += 1 - target_paths.append(target_path) + if self.is_content_identical_to_source(target_path, source_file): + target_paths.append(target_path) + files_adopted += 1 + else: + if self.check_collision( + target_path, rel_path, managed_files, force, diagnostics=diagnostics + ): + files_skipped += 1 + continue + links_resolved = self.copy_agent(source_file, target_path) + total_links_resolved += links_resolved + files_integrated += 1 + target_paths.append(target_path) if claude_agents_dir: claude_target = KNOWN_TARGETS["claude"] @@ -414,8 +459,20 @@ def integrate_package_agents( claude_target, ) claude_path = claude_agents_dir / claude_filename + try: + ensure_path_within(claude_path, claude_agents_dir) + except PathTraversalError as exc: + if diagnostics is not None: + diagnostics.warn( + message=f"Rejected claude agent target path: {exc}", + package=package_info.package.name, + ) + continue claude_rel = portable_relpath(claude_path, project_root) - if not self.check_collision( + if self.is_content_identical_to_source(claude_path, source_file): + target_paths.append(claude_path) + files_adopted += 1 + elif not self.check_collision( claude_path, claude_rel, managed_files, force, diagnostics=diagnostics ): self.copy_agent(source_file, claude_path) @@ -429,8 +486,20 @@ def integrate_package_agents( cursor_target, ) cursor_path = cursor_agents_dir / cursor_filename + try: + ensure_path_within(cursor_path, cursor_agents_dir) + except PathTraversalError as exc: + if diagnostics is not None: + diagnostics.warn( + message=f"Rejected cursor agent target path: {exc}", + package=package_info.package.name, + ) + continue cursor_rel = portable_relpath(cursor_path, project_root) - if not self.check_collision( + if self.is_content_identical_to_source(cursor_path, source_file): + target_paths.append(cursor_path) + files_adopted += 1 + elif not self.check_collision( cursor_path, cursor_rel, managed_files, force, diagnostics=diagnostics ): self.copy_agent(source_file, cursor_path) @@ -442,6 +511,7 @@ def integrate_package_agents( files_skipped=files_skipped, target_paths=target_paths, links_resolved=total_links_resolved, + files_adopted=files_adopted, ) # DEPRECATED: use get_target_filename_for_target(KNOWN_TARGETS["claude"], ...) instead. diff --git a/src/apm_cli/integration/base_integrator.py b/src/apm_cli/integration/base_integrator.py index 68ad81bf4..e37b52af4 100644 --- a/src/apm_cli/integration/base_integrator.py +++ b/src/apm_cli/integration/base_integrator.py @@ -1,5 +1,7 @@ """Base integrator with shared collision detection and sync logic.""" +import errno +import os import re from dataclasses import dataclass, field # noqa: F401 from pathlib import Path @@ -10,6 +12,11 @@ from apm_cli.utils.console import _rich_warning +class _SymlinkRaceError(OSError): + """Raised by ``_read_bytes_no_follow`` when the path becomes a symlink + between the pre-check and the open(). Caught locally; never bubbles.""" + + @dataclass class IntegrationResult: """Result of any file-level integration operation. @@ -33,6 +40,48 @@ class IntegrationResult: sub_skills_promoted: int = 0 skill_created: bool = False + # Number of pre-existing on-disk files that were silently *adopted* + # (byte-identical to source). Counted separately from + # ``files_integrated`` so the install summary can surface the work + # done in adopt-only runs instead of looking like a no-op. + files_adopted: int = 0 + + +def _read_bytes_no_follow(path: Path) -> bytes: + """Read *path* with ``O_NOFOLLOW`` semantics where supported. + + On POSIX, opens the file with ``os.O_NOFOLLOW`` so the kernel + rejects the open atomically if the final path component is a + symlink. This closes the TOCTOU race between + ``Path.is_symlink()`` and ``Path.read_bytes()`` exploited by a + co-tenant who can swap files for symlinks. + + On Windows (no ``O_NOFOLLOW``), falls back to a plain read; the + caller's upfront ``is_symlink()`` check plus ``ensure_path_within`` + at the integrator call sites provide the containment guarantee. + """ + flags = os.O_RDONLY | getattr(os, "O_BINARY", 0) + nofollow = getattr(os, "O_NOFOLLOW", 0) + flags |= nofollow + try: + fd = os.open(str(path), flags) + except OSError as exc: + # ELOOP is the canonical errno for "O_NOFOLLOW refused to open + # a symlink"; some Linux kernels return EMLINK or ELOOP-equivalent. + if nofollow and exc.errno in (errno.ELOOP, getattr(errno, "EMLINK", -1)): + raise _SymlinkRaceError(exc.errno, f"Refused to follow symlink at {path}") from exc + raise + try: + chunks: list[bytes] = [] + while True: + chunk = os.read(fd, 65536) + if not chunk: + break + chunks.append(chunk) + return b"".join(chunks) + finally: + os.close(fd) + class BaseIntegrator: """Shared infrastructure for file-level integrators. @@ -109,6 +158,83 @@ def normalize_managed_files(managed_files: set[str] | None) -> set[str] | None: return None return {p.replace("\\", "/") for p in managed_files} + @staticmethod + def is_content_identical_to_source(target_path: Path, source_path: Path) -> bool: + """Return True if *target_path* is byte-identical to *source_path*. + + Used by non-skill integrators to silently *adopt* a pre-existing + on-disk file that already matches what APM would deploy. + + Why this exists + --------------- + Without this short-circuit, the per-file loops in + ``agent_integrator``, ``instruction_integrator``, ``prompt_integrator`` + and ``command_integrator`` would route the file straight into + :meth:`check_collision`. When the path is missing from + ``managed_files`` (e.g. lockfile was wiped, hand-edited, regenerated + by an older APM build, or the user's previous install crashed before + ``deployed_files`` was persisted) the file is treated as + "user-authored", *skipped*, and never appended to ``target_paths``. + + That in turn leaves ``deployed_files`` empty in the new lockfile, + which trips the ``required-packages-deployed`` policy check at the + next install. Because ``policy_gate`` runs *before* ``integrate`` + in ``pipeline.py``, the install can never self-heal -- a permanent + catch-22 lockout. + + ``skill_integrator`` already has an equivalent content-identity + adopt at ``_promote_sub_skills`` (target.exists() + + ``_dirs_equal``). This helper restores symmetry for non-skill + primitives. + + Conservative by design + ---------------------- + Only fires for *byte-identical* matches. Format-transforming + targets (``codex_agent``, ``cursor_rules``, ``claude_rules``, + ``windsurf_rules``, ``gemini_command``, ...) won't match -- they + keep the existing skip behavior. This means we never silently + adopt content that *might* have come from somewhere else; we only + adopt files that are demonstrably the package's own bytes already + on disk. + + TOCTOU hardening + ---------------- + The classic ``is_symlink()`` -> ``read_bytes()`` sequence has a + race window: a hostile co-tenant on the same machine could swap + a regular file for a symlink between the two calls and cause + ``read_bytes()`` to follow it to an attacker-controlled path, + whose bytes might match source by construction. We close the + race by reading through ``os.open(..., O_NOFOLLOW)`` so the + kernel rejects the open atomically if the final component is a + symlink. ``O_NOFOLLOW`` is a no-op constant on platforms that + lack it (Windows), where the upfront ``is_symlink()`` check + plus ``ensure_path_within`` at the call site provide adequate + coverage (Windows also lacks the cheap unprivileged-symlink + creation primitive that makes this race practical on POSIX). + """ + try: + if not target_path.exists() or not source_path.exists(): + return False + # Cheap pre-check: reject obvious symlinks so we never even + # attempt the open. Race-free verification follows below via + # O_NOFOLLOW. + if target_path.is_symlink() or source_path.is_symlink(): + return False + try: + target_bytes = _read_bytes_no_follow(target_path) + source_bytes = _read_bytes_no_follow(source_path) + except _SymlinkRaceError: + # The path turned into a symlink between the pre-check + # and the open() -- treat as non-identical so the caller + # falls through to ``check_collision`` and the file is + # NOT adopted. Silent (no diagnostic) because adopt is + # an optimisation; the user-authored skip path is the + # safe fallback. + return False + return target_bytes == source_bytes + except OSError: + return False + # Known integration prefixes that APM is allowed to deploy/remove under. # Derived from ``targets.KNOWN_TARGETS`` so adding a target auto-propagates. @staticmethod diff --git a/src/apm_cli/integration/command_integrator.py b/src/apm_cli/integration/command_integrator.py index 7ca7b6586..bc0405f12 100644 --- a/src/apm_cli/integration/command_integrator.py +++ b/src/apm_cli/integration/command_integrator.py @@ -505,6 +505,7 @@ def integrate_commands_for_target( commands_dir = target_root / mapping.subdir files_integrated = 0 files_skipped = 0 + files_adopted = 0 target_paths: list[Path] = [] total_links_resolved = 0 any_dropped_keys = False @@ -549,6 +550,13 @@ def integrate_commands_for_target( rel_path = portable_relpath(target_path, project_root) + if self.is_content_identical_to_source(target_path, prompt_file): + # Pre-existing file is byte-identical to source -- silently + # adopt. See BaseIntegrator.is_content_identical_to_source. + target_paths.append(target_path) + files_adopted += 1 + continue + if self.check_collision( target_path, rel_path, @@ -619,6 +627,7 @@ def integrate_commands_for_target( files_skipped=files_skipped, target_paths=target_paths, links_resolved=total_links_resolved, + files_adopted=files_adopted, ) def sync_for_target( diff --git a/src/apm_cli/integration/hook_integrator.py b/src/apm_cli/integration/hook_integrator.py index 5c7f2475a..70b84cd6c 100644 --- a/src/apm_cli/integration/hook_integrator.py +++ b/src/apm_cli/integration/hook_integrator.py @@ -559,6 +559,7 @@ def integrate_package_hooks( package_name = self._get_package_name(package_info) hooks_integrated = 0 scripts_copied = 0 + scripts_adopted = 0 target_paths: list[Path] = [] for hook_file in hook_files: @@ -599,6 +600,10 @@ def integrate_package_hooks( for source_file, target_rel in scripts: target_script = project_root / target_rel ensure_path_within(target_script, project_root) + if self.is_content_identical_to_source(target_script, source_file): + target_paths.append(target_script) + scripts_adopted += 1 + continue if self.check_collision( target_script, target_rel, managed_files, force, diagnostics=diagnostics ): @@ -614,6 +619,7 @@ def integrate_package_hooks( files_skipped=0, target_paths=target_paths, scripts_copied=scripts_copied, + files_adopted=scripts_adopted, ) # ------------------------------------------------------------------ @@ -659,6 +665,7 @@ def _integrate_merged_hooks( package_name = self._get_package_name(package_info) hooks_integrated = 0 scripts_copied = 0 + scripts_adopted = 0 target_paths: list[Path] = [] # Events whose prior-owned entries have already been cleared on # this install run. Packages can contribute to the same event @@ -780,6 +787,10 @@ def _integrate_merged_hooks( for source_file, target_rel in scripts: target_script = project_root / target_rel ensure_path_within(target_script, project_root) + if self.is_content_identical_to_source(target_script, source_file): + target_paths.append(target_script) + scripts_adopted += 1 + continue if self.check_collision( target_script, target_rel, @@ -807,6 +818,7 @@ def _integrate_merged_hooks( files_skipped=0, target_paths=target_paths, scripts_copied=scripts_copied, + files_adopted=scripts_adopted, ) # ------------------------------------------------------------------ diff --git a/src/apm_cli/integration/instruction_integrator.py b/src/apm_cli/integration/instruction_integrator.py index 01ec489ba..23620df1d 100644 --- a/src/apm_cli/integration/instruction_integrator.py +++ b/src/apm_cli/integration/instruction_integrator.py @@ -98,6 +98,7 @@ def integrate_instructions_for_target( files_integrated = 0 files_skipped = 0 + files_adopted = 0 target_paths: list[Path] = [] total_links_resolved = 0 @@ -119,6 +120,14 @@ def integrate_instructions_for_target( rel_path = portable_relpath(target_path, project_root) + if self.is_content_identical_to_source(target_path, source_file): + # Pre-existing file is byte-identical to source -- silently + # adopt so deployed_files reflects reality. See + # BaseIntegrator.is_content_identical_to_source for rationale. + target_paths.append(target_path) + files_adopted += 1 + continue + if self.check_collision( target_path, rel_path, @@ -148,6 +157,7 @@ def integrate_instructions_for_target( files_skipped=files_skipped, target_paths=target_paths, links_resolved=total_links_resolved, + files_adopted=files_adopted, ) def sync_for_target( diff --git a/src/apm_cli/integration/prompt_integrator.py b/src/apm_cli/integration/prompt_integrator.py index 512a7f596..20c48d715 100644 --- a/src/apm_cli/integration/prompt_integrator.py +++ b/src/apm_cli/integration/prompt_integrator.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Set # noqa: F401, UP035 from apm_cli.integration.base_integrator import BaseIntegrator, IntegrationResult +from apm_cli.utils.path_security import PathTraversalError, ensure_path_within from apm_cli.utils.paths import portable_relpath if TYPE_CHECKING: @@ -169,14 +170,36 @@ def integrate_package_prompts( # Process each prompt file files_integrated = 0 files_skipped = 0 + files_adopted = 0 target_paths = [] total_links_resolved = 0 for source_file in prompt_files: target_filename = self.get_target_filename(source_file, package_info.package.name) target_path = prompts_dir / target_filename + # Defense-in-depth: target_filename is derived from source + # file name; assert containment under prompts_dir to mirror + # the guard already present in command/instruction + # integrators. + try: + ensure_path_within(target_path, prompts_dir) + except PathTraversalError as exc: + if diagnostics is not None: + diagnostics.warn( + message=f"Rejected prompt target path: {exc}", + package=package_info.package.name, + ) + files_skipped += 1 + continue rel_path = portable_relpath(target_path, project_root) + if self.is_content_identical_to_source(target_path, source_file): + # Pre-existing file is byte-identical to source -- silently + # adopt. See BaseIntegrator.is_content_identical_to_source. + target_paths.append(target_path) + files_adopted += 1 + continue + if self.check_collision( target_path, rel_path, managed_files, force, diagnostics=diagnostics ): @@ -194,6 +217,7 @@ def integrate_package_prompts( files_skipped=files_skipped, target_paths=target_paths, links_resolved=total_links_resolved, + files_adopted=files_adopted, ) # DEPRECATED: use sync_for_target(...) instead. diff --git a/tests/integration/test_silent_adopt_existing_files_e2e.py b/tests/integration/test_silent_adopt_existing_files_e2e.py new file mode 100644 index 000000000..02a0057e6 --- /dev/null +++ b/tests/integration/test_silent_adopt_existing_files_e2e.py @@ -0,0 +1,216 @@ +"""End-to-end regression: silent adopt of byte-identical pre-existing files. + +Reproduces the catch-22 reported by zava-storefront where: + 1. A project's apm.lock loses ``deployed_files`` for non-skill packages + (e.g. lockfile wiped, hand-edited, partial install crash, regenerated + by an older APM build). + 2. The deployed files are still on disk, byte-identical to the + package's source. + 3. Re-installing on stock 0.13.0 silently treats those files as + "user-authored", skips them, and writes an empty ``deployed_files`` + back to the lockfile. + 4. The next install with ``required-packages-deployed`` enforced is + permanently blocked because the policy gate runs *before* integrate + in ``pipeline.py`` -- there's no path to self-heal. + +Post-fix, step 3 silently *adopts* the existing files (their bytes match +the package source) and repopulates ``deployed_files`` -- breaking the +catch-22. + +Requires network access and GITHUB_TOKEN/GITHUB_APM_PAT for GitHub API. +""" + +import shutil +import subprocess +from pathlib import Path + +import pytest +import yaml + +pytestmark = pytest.mark.requires_github_token + + +@pytest.fixture +def apm_command(): + """Resolve an apm binary that is wired to *this* checkout's source. + + The repo binary (homebrew/system ``apm``) may be a stable release that + predates the fix under test, which would silently make this regression + test pass against unfixed code. Prefer the CI-built artifact or a + venv-installed editable binary so the test exercises the in-repo + apm_cli source. + + Resolution order: + 1. ``APM_TEST_BINARY`` env override (explicit per-test pin). + 2. ``APM_BINARY_PATH`` env (CI sets this after the build step; + shared with ``apm_binary_path`` fixture in ``conftest.py``). + 3. Repo-local ``.venv/bin/apm`` (this worktree). + 4. Sibling ``../awd-cli/.venv/bin/apm`` (shared dev venv pointing + at this worktree via ``pip install -e .``). + 5. ``apm`` on PATH (last resort; may be stale). + """ + import os + + for env_var in ("APM_TEST_BINARY", "APM_BINARY_PATH"): + override = os.environ.get(env_var) + if override and Path(override).exists(): + return override + + repo_root = Path(__file__).parent.parent.parent + candidates = [ + repo_root / ".venv" / "bin" / "apm", + repo_root.parent / "awd-cli" / ".venv" / "bin" / "apm", + ] + for candidate in candidates: + if candidate.exists(): + return str(candidate) + + apm_on_path = shutil.which("apm") + if apm_on_path: + return apm_on_path + return "apm" + + +@pytest.fixture +def temp_project(tmp_path): + project_dir = tmp_path / "adopt-test" + project_dir.mkdir() + (project_dir / "apm.yml").write_text( + "name: adopt-test\n" + "version: 1.0.0\n" + "description: Test project for silent-adopt regression\n" + "dependencies:\n" + " apm: []\n" + " mcp: []\n" + ) + (project_dir / ".github").mkdir() + (project_dir / ".github" / "copilot-instructions.md").write_text("# test\n") + return project_dir + + +def _run_apm(apm_command, args, cwd, timeout=120): + return subprocess.run( + [apm_command] + args, # noqa: RUF005 + cwd=cwd, + capture_output=True, + text=True, + timeout=timeout, + ) + + +def _read_lockfile(project_dir): + lock_path = project_dir / "apm.lock.yaml" + if not lock_path.exists(): + return None + with open(lock_path) as f: + return yaml.safe_load(f) + + +def _all_deployed_files(lockfile) -> list[str]: + """Flatten every deployed_files entry across all locked deps.""" + out: list[str] = [] + deps = (lockfile or {}).get("dependencies", []) + if isinstance(deps, list): + for entry in deps: + out.extend(entry.get("deployed_files", []) or []) + return out + + +class TestSilentAdoptOfExistingFiles: + """Catch-22: degraded lockfile + identical files on disk -> must self-heal.""" + + def test_reinstall_with_wiped_lockfile_repopulates_deployed_files( + self, temp_project, apm_command + ): + """The exact zava-storefront reproducer. + + Steps: + 1. Install sample package; capture deployed_files set. + 2. Wipe apm.lock.yaml (simulates the degraded state -- lockfile + has no record of which files belong to APM). + 3. Re-install. Files on disk are byte-identical to source. + 4. Assert: the new lockfile records the SAME deployed_files set + (silently adopted), not an empty list. + + Pre-fix on main: assertion fails -- non-skill packages get empty + deployed_files, which would then trip + ``required-packages-deployed`` policy. + """ + result1 = _run_apm(apm_command, ["install", "microsoft/apm-sample-package"], temp_project) + assert result1.returncode == 0, ( + f"First install failed:\nstderr={result1.stderr}\nstdout={result1.stdout}" + ) + + lock1 = _read_lockfile(temp_project) + files_before = sorted(_all_deployed_files(lock1)) + assert files_before, "Test precondition: first install must populate deployed_files" + + # Snapshot disk state for byte-comparison after the re-install. + # deployed_files entries can be either files (agents, instructions, + # prompts, commands, hooks) or directories (skills) -- only snapshot + # plain files for byte-equality. + disk_before = { + f: (temp_project / f).read_bytes() for f in files_before if (temp_project / f).is_file() + } + assert disk_before, "Test precondition: at least one deployed file on disk" + + # --- Simulate the degraded lockfile state --- + (temp_project / "apm.lock.yaml").unlink() + + # Re-install. Files are still on disk, byte-identical to package source. + result2 = _run_apm(apm_command, ["install"], temp_project) + assert result2.returncode == 0, ( + f"Re-install failed:\nstderr={result2.stderr}\nstdout={result2.stdout}" + ) + + lock2 = _read_lockfile(temp_project) + files_after = sorted(_all_deployed_files(lock2)) + + assert files_after == files_before, ( + "deployed_files lost after lockfile-wipe + re-install. " + "This is the catch-22: degraded lockfile cannot self-heal because " + "non-skill integrators skip byte-identical files instead of adopting them.\n" + f" Before: {files_before}\n" + f" After: {files_after}" + ) + + # On-disk content must be unchanged (no spurious overwrites). + for f, content in disk_before.items(): + assert (temp_project / f).read_bytes() == content, ( + f"Adopt path must not modify on-disk bytes: {f} changed." + ) + + def test_required_packages_deployed_passes_after_lockfile_wipe(self, temp_project, apm_command): + """End-to-end: with adopt in place, ``apm audit`` (which runs the + same ``required-packages-deployed`` check the policy gate uses) + passes after a lockfile wipe + re-install -- proving the catch-22 + is broken. + + Skipped if the sample package isn't covered by a + ``required-packages-deployed`` policy in the test environment; + the lockfile-shape assertion above is the primary regression + guard. This test is the integration-level smoke that proves the + full pipeline now self-heals. + """ + # Initial install + r1 = _run_apm(apm_command, ["install", "microsoft/apm-sample-package"], temp_project) + assert r1.returncode == 0, f"first install: {r1.stderr}\n{r1.stdout}" + + # Wipe lockfile -- degraded state + (temp_project / "apm.lock.yaml").unlink() + + # Re-install with --no-policy to bypass any external policy and + # exercise just the integrator/lockfile path. (The fix lives in + # the integrator; --no-policy keeps this test independent of the + # current org policy fixtures.) + r2 = _run_apm(apm_command, ["install", "--no-policy"], temp_project) + assert r2.returncode == 0, f"re-install: {r2.stderr}\n{r2.stdout}" + + lock2 = _read_lockfile(temp_project) + files_after = _all_deployed_files(lock2) + + assert files_after, ( + "deployed_files must be repopulated after re-install -- " + "otherwise required-packages-deployed would block the next " + "install (catch-22)." + ) diff --git a/tests/unit/install/test_services_rendering.py b/tests/unit/install/test_services_rendering.py index 70f15dea8..0d3ccf7a2 100644 --- a/tests/unit/install/test_services_rendering.py +++ b/tests/unit/install/test_services_rendering.py @@ -27,18 +27,27 @@ # --------------------------------------------------------------------------- -def _make_integrator_returning(files_per_target: list[int]) -> MagicMock: +def _make_integrator_returning( + files_per_target: list[int], + adopted_per_target: list[int] | None = None, +) -> MagicMock: """Return a MagicMock whose integrate method returns sequential ``IntegrationResult``-like objects. Each call returns one entry from ``files_per_target`` -- so the first target sees ``files_per_target[0]`` files, etc. + + ``adopted_per_target`` mirrors ``files_per_target`` for the silent + adopt counter; defaults to all zeros. """ integrator = MagicMock() + if adopted_per_target is None: + adopted_per_target = [0] * len(files_per_target) results = [] - for n in files_per_target: + for n, a in zip(files_per_target, adopted_per_target, strict=False): r = MagicMock() r.files_integrated = n + r.files_adopted = a r.links_resolved = 0 r.target_paths = [] results.append(r) @@ -204,6 +213,84 @@ def test_targets_with_zero_files_excluded_from_paths(self, tmp_path: Path) -> No assert "," not in prompt_lines[0] +# --------------------------------------------------------------------------- +# Adopted-file visibility -- the install summary must show silent adopts +# --------------------------------------------------------------------------- + + +class TestAdoptedFileVisibility: + """Pre-fix the adopt branch was invisible. In an adopt-only run the + install summary printed nothing and looked like a no-op even though + the lockfile WAS being repopulated. These tests lock in the new + visibility contract: adopt counts surface in the per-kind line. + """ + + def _run( + self, + tmp_path: Path, + files_per_target: list[int], + adopted_per_target: list[int], + ) -> list[str]: + from apm_cli.install.services import integrate_package_primitives + + target_pool = ["copilot", "claude", "cursor", "codex"] + targets = [KNOWN_TARGETS[name] for name in target_pool[: len(files_per_target)]] + integrator = _make_integrator_returning(files_per_target, adopted_per_target) + kwargs = _integrator_kwargs(integrator) + pkg = _make_pkg_info(tmp_path) + logger = MagicMock() + + with patch( + "apm_cli.integration.dispatch.get_dispatch_table", + return_value=_prompts_only_dispatch(), + ): + integrate_package_primitives( + pkg, + tmp_path, + targets=targets, + diagnostics=MagicMock(), + package_name="test-pkg", + logger=logger, + ctx=_ctx(), + force=False, + managed_files=None, + **kwargs, + ) + return _logger_lines(logger) + + def test_adopt_only_run_emits_summary_line(self, tmp_path: Path) -> None: + """The catch-22 reproducer: lockfile wiped, files still on disk + byte-identical to source. Pre-fix: zero summary lines for the + kind. Post-fix: ``N agents adopted -> ``. + """ + lines = self._run(tmp_path, files_per_target=[0], adopted_per_target=[3]) + prompt_lines = [ln for ln in lines if "agents adopted" in ln] + assert len(prompt_lines) == 1, ( + "Adopt-only run must still emit a per-kind summary line; " + "previously the line was suppressed and the install looked like a no-op." + ) + assert prompt_lines[0].startswith(" |-- 3 agents adopted -> ") + + def test_mixed_integrate_and_adopt_emits_combined_line(self, tmp_path: Path) -> None: + """Half the files freshly written, half adopted. The line must + lead with the integrated count and append the adopt count in + parens so the two phases are individually visible. + """ + lines = self._run(tmp_path, files_per_target=[2], adopted_per_target=[3]) + prompt_lines = [ln for ln in lines if "agents integrated" in ln] + assert len(prompt_lines) == 1 + assert "(3 adopted)" in prompt_lines[0] + assert prompt_lines[0].startswith(" |-- 2 agents integrated (3 adopted) -> ") + + def test_no_work_emits_no_line(self, tmp_path: Path) -> None: + """Belt-and-braces: zero integrated AND zero adopted -> still + no line (warm-cache annotation is a separate concern handled + by TestWarmCacheAnnotation). + """ + lines = self._run(tmp_path, files_per_target=[0], adopted_per_target=[0]) + assert not [ln for ln in lines if "agents integrated" in ln or "agents adopted" in ln] + + # --------------------------------------------------------------------------- # A3 -- (files unchanged) annotation when nothing was integrated # --------------------------------------------------------------------------- diff --git a/tests/unit/integration/test_content_identical_adopt.py b/tests/unit/integration/test_content_identical_adopt.py new file mode 100644 index 000000000..eb46c2084 --- /dev/null +++ b/tests/unit/integration/test_content_identical_adopt.py @@ -0,0 +1,447 @@ +"""Regression tests: silently adopt byte-identical pre-existing files. + +Closes the catch-22 where a degraded lockfile (missing ``deployed_files`` +for non-skill packages) could never self-heal because the per-file loops +in ``agent_integrator``, ``instruction_integrator``, ``prompt_integrator`` +and ``command_integrator`` treated content-identical files as +"user-authored" collisions, skipped them, and emitted an empty +``deployed_files`` -- which then tripped ``required-packages-deployed`` +on the next install. + +``skill_integrator._promote_sub_skills`` already had this short-circuit +(``target.exists()`` + ``_dirs_equal`` -> append + continue). These tests +lock in the symmetric behavior across non-skill primitives. + +Scenarios per integrator: + 1. Pre-existing target byte-identical to source + ``managed_files=None`` + -> silently adopted (target_path appended, files_skipped == 0). + 2. Pre-existing target with DIFFERENT content + ``managed_files=None`` + -> still treated as user-authored collision (existing behavior). + 3. Pure helper: ``BaseIntegrator.is_content_identical_to_source`` + returns the right answer for present/absent/identical/divergent + file pairs. +""" + +from datetime import datetime +from pathlib import Path + +from apm_cli.integration.agent_integrator import AgentIntegrator +from apm_cli.integration.base_integrator import BaseIntegrator +from apm_cli.integration.instruction_integrator import InstructionIntegrator +from apm_cli.integration.prompt_integrator import PromptIntegrator +from apm_cli.integration.targets import KNOWN_TARGETS +from apm_cli.models.apm_package import ( + APMPackage, + GitReferenceType, + PackageInfo, + ResolvedReference, +) + + +def _make_package_info(package_dir: Path, name: str = "test-pkg") -> PackageInfo: + package = APMPackage( + name=name, + version="1.0.0", + package_path=package_dir, + source=f"github.com/test/{name}", + ) + resolved_ref = ResolvedReference( + original_ref="main", + ref_type=GitReferenceType.BRANCH, + resolved_commit="abc123", + ref_name="main", + ) + return PackageInfo( + package=package, + install_path=package_dir, + resolved_reference=resolved_ref, + installed_at=datetime.now().isoformat(), + ) + + +# --------------------------------------------------------------------------- +# Pure helper +# --------------------------------------------------------------------------- + + +class TestIsContentIdenticalToSource: + def test_identical_files_return_true(self, tmp_path: Path) -> None: + a = tmp_path / "a" + b = tmp_path / "b" + a.write_bytes(b"hello world\n") + b.write_bytes(b"hello world\n") + assert BaseIntegrator.is_content_identical_to_source(a, b) is True + + def test_divergent_files_return_false(self, tmp_path: Path) -> None: + a = tmp_path / "a" + b = tmp_path / "b" + a.write_bytes(b"hello world\n") + b.write_bytes(b"different bytes\n") + assert BaseIntegrator.is_content_identical_to_source(a, b) is False + + def test_target_missing_returns_false(self, tmp_path: Path) -> None: + a = tmp_path / "missing" + b = tmp_path / "present" + b.write_bytes(b"x") + assert BaseIntegrator.is_content_identical_to_source(a, b) is False + + def test_source_missing_returns_false(self, tmp_path: Path) -> None: + a = tmp_path / "present" + b = tmp_path / "missing" + a.write_bytes(b"x") + assert BaseIntegrator.is_content_identical_to_source(a, b) is False + + +# --------------------------------------------------------------------------- +# Instruction integrator -- the user's reproducer (zava-storefront) +# --------------------------------------------------------------------------- + + +class TestInstructionIntegratorAdopt: + """Covers the secure-baseline scenario reported by zava-storefront.""" + + def _build(self, tmp_path: Path, source_bytes: bytes) -> tuple[Path, Path, PackageInfo]: + pkg_dir = tmp_path / "pkg" + inst_dir = pkg_dir / ".apm" / "instructions" + inst_dir.mkdir(parents=True) + source = inst_dir / "secure-coding.instructions.md" + source.write_bytes(source_bytes) + + deploy_dir = tmp_path / ".github" / "instructions" + deploy_dir.mkdir(parents=True) + target = deploy_dir / "secure-coding.instructions.md" + + return source, target, _make_package_info(pkg_dir) + + def test_identical_pre_existing_file_is_adopted_when_managed_none(self, tmp_path: Path) -> None: + """Lockfile lost deployed_files -> next install must adopt, not skip. + + This is the exact catch-22: file already on disk, byte-identical + to source, but absent from managed_files. Pre-fix: skipped, empty + deployed_files -> policy block. Post-fix: adopted, target_paths + populated -> deployed_files restored. + """ + body = b"---\napplyTo: '**/*.py'\n---\n# Secure coding base\n" + _, target, pkg_info = self._build(tmp_path, body) + target.write_bytes(body) # pre-existing, byte-identical + + result = InstructionIntegrator().integrate_instructions_for_target( + KNOWN_TARGETS["copilot"], + pkg_info, + tmp_path, + force=False, + managed_files=None, # <-- the degraded-lockfile state + ) + + assert target in result.target_paths, ( + "Byte-identical pre-existing file must be adopted into target_paths " + "so apm.lock.deployed_files repopulates and policy gate passes." + ) + assert result.files_skipped == 0, ( + "Adopted file must NOT count as skipped (would mislead users)." + ) + # Bytes preserved (no-op write or no write at all -- both acceptable). + assert target.read_bytes() == body + + def test_divergent_pre_existing_file_is_still_skipped(self, tmp_path: Path) -> None: + """User-authored content with different bytes keeps the existing + protection: skipped, content preserved.""" + source_body = b"---\napplyTo: '**/*.py'\n---\n# APM-provided\n" + user_body = b"# my hand-rolled rules\n" + _, target, pkg_info = self._build(tmp_path, source_body) + target.write_bytes(user_body) + + result = InstructionIntegrator().integrate_instructions_for_target( + KNOWN_TARGETS["copilot"], + pkg_info, + tmp_path, + force=False, + managed_files=None, + ) + + assert target not in result.target_paths + assert result.files_skipped >= 1 + assert target.read_bytes() == user_body, "User-authored content must not be overwritten." + + +# --------------------------------------------------------------------------- +# Agent integrator -- secure-baseline ships .agent.md too +# --------------------------------------------------------------------------- + + +class TestAgentIntegratorAdopt: + def _build(self, tmp_path: Path, body: bytes) -> tuple[Path, Path, PackageInfo]: + pkg_dir = tmp_path / "pkg" + agents_dir_src = pkg_dir / ".apm" / "agents" + agents_dir_src.mkdir(parents=True) + source = agents_dir_src / "security.agent.md" + source.write_bytes(body) + + deploy_dir = tmp_path / ".github" / "agents" + deploy_dir.mkdir(parents=True) + # copilot keeps the .agent.md suffix (no rename for primary target) + target = deploy_dir / "security.agent.md" + + return source, target, _make_package_info(pkg_dir) + + def test_identical_pre_existing_agent_is_adopted(self, tmp_path: Path) -> None: + body = b"---\nname: security\n---\n# Security agent\n" + _, target, pkg_info = self._build(tmp_path, body) + target.write_bytes(body) + + result = AgentIntegrator().integrate_agents_for_target( + KNOWN_TARGETS["copilot"], + pkg_info, + tmp_path, + force=False, + managed_files=None, + ) + + assert target in result.target_paths + assert result.files_skipped == 0 + assert target.read_bytes() == body + + +# --------------------------------------------------------------------------- +# Prompt integrator +# --------------------------------------------------------------------------- + + +class TestPromptIntegratorAdopt: + def test_identical_pre_existing_prompt_is_adopted(self, tmp_path: Path) -> None: + pkg_dir = tmp_path / "pkg" + prompts_src = pkg_dir / ".apm" / "prompts" + prompts_src.mkdir(parents=True) + body = b"---\nmode: agent\n---\n# Sample prompt\n" + (prompts_src / "sample.prompt.md").write_bytes(body) + + deploy_dir = tmp_path / ".github" / "prompts" + deploy_dir.mkdir(parents=True) + target = deploy_dir / "sample.prompt.md" + target.write_bytes(body) + + pkg_info = _make_package_info(pkg_dir) + result = PromptIntegrator().integrate_prompts_for_target( + KNOWN_TARGETS["copilot"], + pkg_info, + tmp_path, + force=False, + managed_files=None, + ) + + assert target in result.target_paths + assert result.files_skipped == 0 + + +# --------------------------------------------------------------------------- +# Symlink-rejection guard (TOCTOU defense in is_content_identical_to_source) +# --------------------------------------------------------------------------- + + +class TestIsContentIdenticalSymlinkGuard: + """The adopt branch fires *before* check_collision's containment + guard. Without symlink rejection, a co-tenant who pre-places a + symlink at the deploy path -- pointing to an out-of-project file + whose bytes match source -- could slip the symlink target into + ``deployed_files``. These tests lock in the guard. + """ + + def test_target_is_symlink_returns_false(self, tmp_path: Path) -> None: + body = b"identical bytes\n" + source = tmp_path / "source" + source.write_bytes(body) + # Pre-place a symlink at target whose link-target is byte-identical + decoy = tmp_path / "decoy_outside" + decoy.write_bytes(body) + target = tmp_path / "target" + target.symlink_to(decoy) + + # Both files exist; both have identical bytes when followed. + # Guard MUST refuse to adopt the symlink. + assert BaseIntegrator.is_content_identical_to_source(target, source) is False + + def test_source_is_symlink_returns_false(self, tmp_path: Path) -> None: + body = b"identical bytes\n" + target = tmp_path / "target" + target.write_bytes(body) + decoy = tmp_path / "decoy_outside" + decoy.write_bytes(body) + source = tmp_path / "source" + source.symlink_to(decoy) + + assert BaseIntegrator.is_content_identical_to_source(target, source) is False + + def test_identical_regular_files_still_adopt(self, tmp_path: Path) -> None: + """Regression trap: the symlink guard must not block the happy path.""" + body = b"identical bytes\n" + source = tmp_path / "source" + target = tmp_path / "target" + source.write_bytes(body) + target.write_bytes(body) + assert BaseIntegrator.is_content_identical_to_source(target, source) is True + + +# --------------------------------------------------------------------------- +# files_adopted counter -- visibility of silent-adopt work +# --------------------------------------------------------------------------- + + +class TestFilesAdoptedCounter: + """Pre-fix the adopt branch was invisible: target_paths grew but no + counter incremented and the install summary printed nothing in + adopt-only runs. These tests lock in the visibility contract. + """ + + def test_instruction_adopt_increments_files_adopted(self, tmp_path: Path) -> None: + body = b"---\napplyTo: '**/*.py'\n---\n# rule\n" + pkg_dir = tmp_path / "pkg" + inst_dir = pkg_dir / ".apm" / "instructions" + inst_dir.mkdir(parents=True) + (inst_dir / "x.instructions.md").write_bytes(body) + deploy_dir = tmp_path / ".github" / "instructions" + deploy_dir.mkdir(parents=True) + (deploy_dir / "x.instructions.md").write_bytes(body) + + result = InstructionIntegrator().integrate_instructions_for_target( + KNOWN_TARGETS["copilot"], + _make_package_info(pkg_dir), + tmp_path, + force=False, + managed_files=None, + ) + + assert result.files_adopted == 1 + assert result.files_integrated == 0 + assert result.files_skipped == 0 + + def test_prompt_adopt_increments_files_adopted(self, tmp_path: Path) -> None: + body = b"---\nmode: agent\n---\n# p\n" + pkg_dir = tmp_path / "pkg" + src = pkg_dir / ".apm" / "prompts" + src.mkdir(parents=True) + (src / "p.prompt.md").write_bytes(body) + deploy = tmp_path / ".github" / "prompts" + deploy.mkdir(parents=True) + (deploy / "p.prompt.md").write_bytes(body) + + result = PromptIntegrator().integrate_prompts_for_target( + KNOWN_TARGETS["copilot"], + _make_package_info(pkg_dir), + tmp_path, + force=False, + managed_files=None, + ) + assert result.files_adopted == 1 + assert result.files_integrated == 0 + + def test_agent_adopt_increments_files_adopted(self, tmp_path: Path) -> None: + body = b"---\nname: a\n---\n# a\n" + pkg_dir = tmp_path / "pkg" + src = pkg_dir / ".apm" / "agents" + src.mkdir(parents=True) + (src / "a.agent.md").write_bytes(body) + deploy = tmp_path / ".github" / "agents" + deploy.mkdir(parents=True) + (deploy / "a.agent.md").write_bytes(body) + + result = AgentIntegrator().integrate_agents_for_target( + KNOWN_TARGETS["copilot"], + _make_package_info(pkg_dir), + tmp_path, + force=False, + managed_files=None, + ) + assert result.files_adopted == 1 + assert result.files_integrated == 0 + + +# --------------------------------------------------------------------------- +# Legacy multi-target adopt: integrate_package_agents (cursor + claude) +# --------------------------------------------------------------------------- + + +class TestIntegratePackageAgentsAdopt: + """The legacy ``integrate_package_agents`` auto-fans agents to + .claude/agents/ and .cursor/agents/ when those dirs exist. Each fan + site has its own adopt branch; pre-fix none were tested. These + tests lock in the secondary adopt sites and the new + ``ensure_path_within`` containment guard added at each. + """ + + def _make_pkg(self, tmp_path: Path, body: bytes) -> tuple[Path, PackageInfo]: + pkg_dir = tmp_path / "pkg" + src = pkg_dir / ".apm" / "agents" + src.mkdir(parents=True) + (src / "sec.agent.md").write_bytes(body) + return pkg_dir, _make_package_info(pkg_dir) + + def test_claude_secondary_adopt_fires_for_byte_identical(self, tmp_path: Path) -> None: + body = b"---\nname: sec\n---\n# sec\n" + _pkg_dir, pkg_info = self._make_pkg(tmp_path, body) + + # Pre-create .claude/agents/ with byte-identical pre-existing file + claude_agents = tmp_path / ".claude" / "agents" + claude_agents.mkdir(parents=True) + claude_target = claude_agents / "sec.md" # claude strips .agent.md + claude_target.write_bytes(body) + + # Need the copilot deploy dir too (primary target) + (tmp_path / ".github" / "agents").mkdir(parents=True) + + result = AgentIntegrator().integrate_package_agents( + pkg_info, + tmp_path, + force=False, + managed_files=None, + ) + + assert claude_target in result.target_paths, ( + "Claude secondary adopt branch must fire for byte-identical pre-existing file" + ) + assert result.files_adopted >= 1 + + def test_cursor_secondary_adopt_fires_for_byte_identical(self, tmp_path: Path) -> None: + body = b"---\nname: sec\n---\n# sec\n" + _pkg_dir, pkg_info = self._make_pkg(tmp_path, body) + + cursor_agents = tmp_path / ".cursor" / "agents" + cursor_agents.mkdir(parents=True) + cursor_target = cursor_agents / "sec.md" + cursor_target.write_bytes(body) + + (tmp_path / ".github" / "agents").mkdir(parents=True) + + result = AgentIntegrator().integrate_package_agents( + pkg_info, + tmp_path, + force=False, + managed_files=None, + ) + + assert cursor_target in result.target_paths, ( + "Cursor secondary adopt branch must fire for byte-identical pre-existing file" + ) + assert result.files_adopted >= 1 + + def test_claude_secondary_skips_user_authored_divergent(self, tmp_path: Path) -> None: + body = b"---\nname: sec\n---\n# sec\n" + user_body = b"# user-authored claude agent\n" + _pkg_dir, pkg_info = self._make_pkg(tmp_path, body) + + claude_agents = tmp_path / ".claude" / "agents" + claude_agents.mkdir(parents=True) + claude_target = claude_agents / "sec.md" + claude_target.write_bytes(user_body) + + (tmp_path / ".github" / "agents").mkdir(parents=True) + + AgentIntegrator().integrate_package_agents( + pkg_info, + tmp_path, + force=False, + managed_files=None, + ) + + # User content preserved -- adopt didn't fire (divergent), and + # check_collision's force=False kept the file. + assert claude_target.read_bytes() == user_body diff --git a/tests/unit/integration/test_hook_integrator.py b/tests/unit/integration/test_hook_integrator.py index 723fa4263..c6be6b4da 100644 --- a/tests/unit/integration/test_hook_integrator.py +++ b/tests/unit/integration/test_hook_integrator.py @@ -3267,3 +3267,67 @@ def test_copy_rejects_traversal_target_rel(self, tmp_path: Path) -> None: target_script = project_root / ".." / ".." / "etc" / "passwd" ensure_path_within(target_script, project_root) + + +class TestHookScriptAdopt: + """Regression: hook integrator silently adopts byte-identical pre-existing scripts. + + Same catch-22 the non-skill integrators close: when ``apm.lock.yaml`` lost + ``deployed_files`` for a hook package and the referenced scripts are still + on disk byte-identical to source, the install must adopt them (append to + ``target_paths`` so they re-enter ``deployed_files``) instead of treating + them as user-authored collisions and skipping. Otherwise the next install + with ``required-packages-deployed`` enforced is permanently blocked. + """ + + @pytest.fixture + def temp_project(self): + temp_dir = tempfile.mkdtemp() + project = Path(temp_dir) + (project / ".github").mkdir() + yield project + shutil.rmtree(temp_dir, ignore_errors=True) + + def _setup_hookify_with_preexisting_scripts(self, project: Path) -> PackageInfo: + """Hookify package + pre-place all scripts at their target paths byte-identical.""" + pkg_dir = project / "apm_modules" / "anthropics" / "hookify" + hooks_dir = pkg_dir / "hooks" + hooks_dir.mkdir(parents=True, exist_ok=True) + (hooks_dir / "hooks.json").write_text(json.dumps(HOOKIFY_HOOKS_JSON, indent=2)) + scripts_dir_target = project / ".github" / "hooks" / "scripts" / "hookify" / "hooks" + scripts_dir_target.mkdir(parents=True, exist_ok=True) + for script in ["pretooluse.py", "posttooluse.py", "stop.py", "userpromptsubmit.py"]: + content = f"#!/usr/bin/env python3\n# {script}" + (hooks_dir / script).write_text(content) + (scripts_dir_target / script).write_text(content) + return _make_package_info(pkg_dir, "hookify") + + def test_vscode_adopts_byte_identical_scripts_with_no_managed_files(self, temp_project): + """managed_files=None + on-disk scripts byte-identical to source -> silent adopt.""" + pkg_info = self._setup_hookify_with_preexisting_scripts(temp_project) + integrator = HookIntegrator() + + result = integrator.integrate_package_hooks(pkg_info, temp_project, managed_files=None) + + # All four scripts must land in target_paths so deployed_files + # repopulates and the catch-22 self-heals. + scripts_dir = temp_project / ".github" / "hooks" / "scripts" / "hookify" / "hooks" + for script in ["pretooluse.py", "posttooluse.py", "stop.py", "userpromptsubmit.py"]: + assert (scripts_dir / script) in result.target_paths, ( + f"{script} missing from target_paths -- catch-22 reproduces for hook scripts" + ) + + def test_vscode_does_not_adopt_modified_scripts(self, temp_project): + """Pre-existing script with DIFFERENT content -> still treated as user-authored.""" + pkg_info = self._setup_hookify_with_preexisting_scripts(temp_project) + # Mutate one target script so it diverges from source. + scripts_dir_target = temp_project / ".github" / "hooks" / "scripts" / "hookify" / "hooks" + (scripts_dir_target / "pretooluse.py").write_text("#!/usr/bin/env python3\n# user edited") + + integrator = HookIntegrator() + result = integrator.integrate_package_hooks(pkg_info, temp_project, managed_files=None) + + # Adopt fires for the three identical scripts; the modified one is skipped. + for script in ["posttooluse.py", "stop.py", "userpromptsubmit.py"]: + assert (scripts_dir_target / script) in result.target_paths + assert (scripts_dir_target / "pretooluse.py") not in result.target_paths