From 2d7a38734bce01b0b95a18963918141796beb69f Mon Sep 17 00:00:00 2001 From: Fawx AI Date: Thu, 26 Mar 2026 01:59:32 +0000 Subject: [PATCH] feat: add public promotion guard Adds a promotion guard script that validates branches before public PRs. Checks for blocked paths, non-allowlisted files, private markers, workflow leaks, and public metadata regressions. Includes 16 regression tests covering all guard check paths. --- scripts/README.md | 18 + scripts/check-public-promotion | 12 + scripts/check-public-promotion.toml | 98 ++++ scripts/check_public_promotion.py | 484 +++++++++++++++++++ scripts/tests/test_check_public_promotion.py | 353 ++++++++++++++ 5 files changed, 965 insertions(+) create mode 100644 scripts/README.md create mode 100755 scripts/check-public-promotion create mode 100644 scripts/check-public-promotion.toml create mode 100755 scripts/check_public_promotion.py create mode 100644 scripts/tests/test_check_public_promotion.py diff --git a/scripts/README.md b/scripts/README.md new file mode 100644 index 00000000..861764be --- /dev/null +++ b/scripts/README.md @@ -0,0 +1,18 @@ +# Scripts + +## Public Promotion Guard + +Use this from a promotion branch that is based on `public/main` after cherry-picking only OSS-safe commits. + +### Run Locally + +Python 3.11+ is required for the guard and its regression tests because `check_public_promotion.py` uses `tomllib`. + +- Fetch the public base ref if needed: + - `git fetch public main` +- Run the guard: + - `scripts/check-public-promotion` +- Run the guard regression tests: + - `python3 -m unittest scripts/tests/test_check_public_promotion.py` + +The guard compares the current branch against `public/main`, fails on blocked or non-allowlisted paths, scans added lines for private markers, and checks a few public invariants before you open a public PR. diff --git a/scripts/check-public-promotion b/scripts/check-public-promotion new file mode 100755 index 00000000..8c3c6ad3 --- /dev/null +++ b/scripts/check-public-promotion @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 +from pathlib import Path +import sys + +SCRIPT_DIR = Path(__file__).resolve().parent +sys.path.insert(0, str(SCRIPT_DIR)) + +from check_public_promotion import main + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/check-public-promotion.toml b/scripts/check-public-promotion.toml new file mode 100644 index 00000000..1af1fecc --- /dev/null +++ b/scripts/check-public-promotion.toml @@ -0,0 +1,98 @@ +base_ref = "public/main" +review_warning_file_count = 40 +review_warning_area_count = 4 + +allowlist = [ + ".cargo/**", + ".github/workflows/ci.yml", + "assets/**", + "bindings/**", + "engine/**", + "tui/**", + "scripts/check-public-promotion", + "scripts/check-public-promotion.toml", + "scripts/check_public_promotion.py", + "scripts/README.md", + "scripts/tests/**", + "Cargo.toml", + "Cargo.lock", + "install.sh", + "README.md", + "CONTRIBUTING.md", + "ARCHITECTURE.md", + "LICENSE", + "ENGINEERING.md", + "TASTE.md", + "DOCTRINE.md", + "docs/README.md", + "docs/SPEC.md", + "docs/WASM_SKILLS.md", + "docs/assets/**", + "docs/architecture/**", + "docs/decisions/**", + "docs/legal/**", + "docs/oss-extraction-checklist.md", +] + +blocklist = [ + "app/**", + "docs/strategy/**", + "memory/**", + "AGENTS.md", + "BOOTSTRAP.md", + "SECURITY.md", + "SOUL.md", + "USER.md", + "IDENTITY.md", + "WORKFLOW_AUTO.md", + "HEARTBEAT.md", + ".ci/**", + "docs/roadmap.html", + "docs/test-results/**", + "scripts/build-dmg.sh", + "scripts/build-dmg-config.example.sh", + "scripts/release.sh", + "scripts/imported/**", + "scripts/squad/**", + ".github/workflows/android-atomic-nightly.yml", + ".github/workflows/claude.yml", +] + +author_private_patterns = [ + "\\bJoe\\b", + "\\bJoseph\\b", + "\\babbudjoe\\b", + "\\bclawdio\\b", +] + +[[markers]] +name = "private repo reference" +pattern = "\\babbudjoe/fawx\\b" + +[[markers]] +name = "internal assistant name" +pattern = "\\bclawdio\\b" + +[[markers]] +name = "Tailscale hostname" +pattern = "\\b[a-z0-9-]+(?:\\.[a-z0-9-]+)+\\.ts\\.net\\b" + +[[markers]] +name = "Tailscale IPv4" +pattern = "\\b100\\.(?:6[4-9]|[7-9][0-9]|1[01][0-9]|12[0-7])(?:\\.\\d{1,3}){2}\\b" + +[[markers]] +name = "suspicious credential token" +pattern = "\\b(?:ghp_[A-Za-z0-9]{20,}|github_pat_[A-Za-z0-9_]{20,}|sk-[A-Za-z0-9_-]{12,})\\b" + +[[workflow_private_markers]] +name = "private repo reference" +pattern = "\\babbudjoe/" + +[[workflow_private_markers]] +name = "private host or tailnet endpoint" +pattern = "(?:\\.ts\\.net\\b|tailscale_https|wss://[^\\s\"']*\\.ts\\.net\\b)" + +[[workflow_private_markers]] +name = "internal IP address" +pattern = "\\b(?:10(?:\\.\\d{1,3}){3}|192\\.168(?:\\.\\d{1,3}){2}|172\\.(?:1[6-9]|2[0-9]|3[01])(?:\\.\\d{1,3}){2}|100\\.(?:6[4-9]|[7-9][0-9]|1[01][0-9]|12[0-7])(?:\\.\\d{1,3}){2})\\b" diff --git a/scripts/check_public_promotion.py b/scripts/check_public_promotion.py new file mode 100755 index 00000000..a844c060 --- /dev/null +++ b/scripts/check_public_promotion.py @@ -0,0 +1,484 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import re +import subprocess +import sys +import tomllib +from dataclasses import dataclass +from fnmatch import fnmatchcase +from pathlib import Path +from typing import Sequence + +HUNK_HEADER_RE = re.compile(r"^@@ -\d+(?:,\d+)? \+(\d+)(?:,(\d+))? @@") +AUTHOR_FIELD_RE = re.compile(r"\bauthors?\b\s*=") + + +class GuardError(RuntimeError): + """Raised when the guard cannot evaluate the repository safely.""" + + +@dataclass(frozen=True) +class NamedPattern: + name: str + pattern: re.Pattern[str] + + +@dataclass(frozen=True) +class AddedLine: + path: str + line_number: int + text: str + + +@dataclass(frozen=True) +class Finding: + path: str + message: str + line_number: int | None = None + excerpt: str | None = None + + +@dataclass(frozen=True) +class GuardConfig: + base_ref: str + allowlist: tuple[str, ...] + blocklist: tuple[str, ...] + marker_patterns: tuple[NamedPattern, ...] + author_private_patterns: tuple[re.Pattern[str], ...] + workflow_private_patterns: tuple[NamedPattern, ...] + review_warning_file_count: int + review_warning_area_count: int + + +@dataclass(frozen=True) +class DiffContext: + base_ref: str + changed_paths: tuple[str, ...] + active_paths: tuple[str, ...] + added_lines: tuple[AddedLine, ...] + + +@dataclass(frozen=True) +class Report: + context: DiffContext + blocked_paths: tuple[str, ...] + allowlist_misses: tuple[str, ...] + marker_findings: tuple[Finding, ...] + invariant_findings: tuple[Finding, ...] + warnings: tuple[str, ...] + + @property + def failed(self) -> bool: + return any( + [ + self.blocked_paths, + self.allowlist_misses, + self.marker_findings, + self.invariant_findings, + ] + ) + + +def main() -> int: + try: + report = build_report(Path(__file__).resolve().parent) + except GuardError as error: + print("check-public-promotion: FAIL") + print() + print(error) + return 1 + + print(render_report(report)) + return 1 if report.failed else 0 + + +def build_report(script_dir: Path) -> Report: + config = load_config(script_dir / "check-public-promotion.toml") + repo_root = resolve_repo_root(script_dir) + context = collect_diff_context(repo_root, config.base_ref) + blocked_paths = find_matching_paths(context.changed_paths, config.blocklist) + allowlist_misses = find_allowlist_misses( + context.changed_paths, + config.allowlist, + blocked_paths, + ) + marker_findings = scan_added_lines(context.added_lines, config.marker_patterns) + invariant_findings = collect_invariant_findings(repo_root, context, config) + warnings = build_warnings(context.changed_paths, config) + return Report( + context=context, + blocked_paths=blocked_paths, + allowlist_misses=allowlist_misses, + marker_findings=marker_findings, + invariant_findings=invariant_findings, + warnings=warnings, + ) + + +def load_config(config_path: Path) -> GuardConfig: + with config_path.open("rb") as handle: + raw = tomllib.load(handle) + + return GuardConfig( + base_ref=raw["base_ref"], + allowlist=tuple(raw["allowlist"]), + blocklist=tuple(raw["blocklist"]), + marker_patterns=compile_named_patterns(raw["markers"]), + author_private_patterns=compile_patterns(raw["author_private_patterns"]), + workflow_private_patterns=compile_named_patterns(raw["workflow_private_markers"]), + review_warning_file_count=int(raw["review_warning_file_count"]), + review_warning_area_count=int(raw["review_warning_area_count"]), + ) + + +def compile_named_patterns(raw_patterns: Sequence[dict[str, str]]) -> tuple[NamedPattern, ...]: + compiled = [] + for entry in raw_patterns: + compiled.append(NamedPattern(entry["name"], re.compile(entry["pattern"]))) + return tuple(compiled) + + +def compile_patterns(raw_patterns: Sequence[str]) -> tuple[re.Pattern[str], ...]: + return tuple(re.compile(pattern) for pattern in raw_patterns) + + +def resolve_repo_root(script_dir: Path) -> Path: + output = git_stdout(script_dir, ["rev-parse", "--show-toplevel"]) + return Path(output.strip()) + + +def collect_diff_context(repo_root: Path, base_ref: str) -> DiffContext: + ensure_ref_exists(repo_root, base_ref) + diff_range = f"{base_ref}...HEAD" + changed_paths = tuple(git_lines(repo_root, ["diff", "--name-only", diff_range])) + active_paths = tuple( + git_lines(repo_root, ["diff", "--name-only", "--diff-filter=ACMR", diff_range]) + ) + added_lines = tuple(collect_added_lines(repo_root, diff_range, active_paths)) + return DiffContext(base_ref, changed_paths, active_paths, added_lines) + + +def ensure_ref_exists(repo_root: Path, ref_name: str) -> None: + result = subprocess.run( + ["git", "rev-parse", "--verify", f"{ref_name}^{{commit}}"], + cwd=repo_root, + text=True, + capture_output=True, + check=False, + ) + if result.returncode == 0: + return + raise GuardError( + f"Base ref '{ref_name}' is missing. Fetch it first, for example: git fetch public main" + ) + + +def collect_added_lines( + repo_root: Path, + diff_range: str, + active_paths: Sequence[str], +) -> list[AddedLine]: + if not active_paths: + return [] + diff_text = git_stdout( + repo_root, + ["diff", "--unified=0", "--no-color", diff_range, "--", *active_paths], + ) + return parse_added_lines(diff_text, set(active_paths)) + + +def parse_added_lines(diff_text: str, active_paths: set[str]) -> list[AddedLine]: + findings: list[AddedLine] = [] + current_path: str | None = None + line_number: int | None = None + for raw_line in diff_text.splitlines(): + if raw_line.startswith("diff --git "): + current_path = None + line_number = None + continue + path = parse_diff_path(raw_line, active_paths) + if path is not None: + current_path = path + line_number = None + continue + header = HUNK_HEADER_RE.match(raw_line) + if header: + line_number = int(header.group(1)) + continue + if raw_line == r"\ No newline at end of file": + continue + if current_path is None or line_number is None: + continue + if raw_line.startswith("+"): + findings.append(AddedLine(current_path, line_number, raw_line[1:])) + line_number += 1 + continue + if raw_line.startswith("-"): + continue + line_number += 1 + return findings + + +def parse_diff_path(raw_line: str, active_paths: set[str]) -> str | None: + if not raw_line.startswith("+++ "): + return None + path = strip_diff_prefix(raw_line[4:]) + if path == "/dev/null" or path not in active_paths: + return None + return path + + +def strip_diff_prefix(path: str) -> str: + if path.startswith(("a/", "b/")): + return path[2:] + return path + + +def git_stdout(repo_root: Path, args: Sequence[str]) -> str: + result = subprocess.run( + ["git", *args], + cwd=repo_root, + text=True, + capture_output=True, + check=False, + ) + if result.returncode == 0: + return result.stdout + message = result.stderr.strip() or result.stdout.strip() or "git command failed" + raise GuardError(message) + + +def git_lines(repo_root: Path, args: Sequence[str]) -> list[str]: + return [line for line in git_stdout(repo_root, args).splitlines() if line.strip()] + + +def find_matching_paths(paths: Sequence[str], patterns: Sequence[str]) -> tuple[str, ...]: + return tuple(path for path in paths if any(path_matches(path, pattern) for pattern in patterns)) + + +def find_allowlist_misses( + paths: Sequence[str], + allowlist: Sequence[str], + blocked_paths: Sequence[str], +) -> tuple[str, ...]: + blocked_set = set(blocked_paths) + misses = [] + for path in paths: + if path in blocked_set: + continue + if any(path_matches(path, pattern) for pattern in allowlist): + continue + misses.append(path) + return tuple(misses) + + +def path_matches(path: str, pattern: str) -> bool: + if fnmatchcase(path, pattern): + return True + return pattern.endswith("/**") and path == pattern[:-3] + + +def scan_added_lines( + added_lines: Sequence[AddedLine], + patterns: Sequence[NamedPattern], +) -> tuple[Finding, ...]: + findings: list[Finding] = [] + for added_line in added_lines: + match = first_named_match(added_line.text, patterns) + if match is None: + continue + findings.append( + Finding( + path=added_line.path, + line_number=added_line.line_number, + message=match.name, + excerpt=added_line.text.strip(), + ) + ) + return tuple(findings) + + +def first_named_match(text: str, patterns: Sequence[NamedPattern]) -> NamedPattern | None: + for pattern in patterns: + if pattern.pattern.search(text): + return pattern + return None + + +def collect_invariant_findings( + repo_root: Path, + context: DiffContext, + config: GuardConfig, +) -> tuple[Finding, ...]: + findings: list[Finding] = [] + findings.extend(check_llama_reintroduction(repo_root, context)) + findings.extend(check_author_metadata(context.added_lines, config)) + findings.extend(check_workflow_refs(context.added_lines, config)) + return tuple(findings) + + +def check_llama_reintroduction( + repo_root: Path, + context: DiffContext, +) -> tuple[Finding, ...]: + if ref_has_path(repo_root, context.base_ref, "engine/crates/llama-cpp-sys/Cargo.toml"): + return () + + findings = [] + for added_line in context.added_lines: + if "llama-cpp-sys" not in added_line.text: + continue + findings.append( + Finding( + path=added_line.path, + line_number=added_line.line_number, + message="llama-cpp-sys is absent from the base ref and should not be reintroduced", + excerpt=added_line.text.strip(), + ) + ) + return tuple(findings) + + +def ref_has_path(repo_root: Path, ref_name: str, repo_path: str) -> bool: + result = subprocess.run( + ["git", "cat-file", "-e", f"{ref_name}:{repo_path}"], + cwd=repo_root, + text=True, + capture_output=True, + check=False, + ) + return result.returncode == 0 + + +def check_author_metadata( + added_lines: Sequence[AddedLine], + config: GuardConfig, +) -> tuple[Finding, ...]: + findings = [] + for added_line in added_lines: + if not is_author_metadata_file(added_line.path): + continue + if not AUTHOR_FIELD_RE.search(added_line.text): + continue + if not any(pattern.search(added_line.text) for pattern in config.author_private_patterns): + continue + findings.append( + Finding( + path=added_line.path, + line_number=added_line.line_number, + message="author metadata should stay public-safe (for example: Fawx AI)", + excerpt=added_line.text.strip(), + ) + ) + return tuple(findings) + + +def is_author_metadata_file(path: str) -> bool: + return path.endswith("Cargo.toml") or path.endswith("manifest.toml") + + +def check_workflow_refs( + added_lines: Sequence[AddedLine], + config: GuardConfig, +) -> tuple[Finding, ...]: + findings = [] + for added_line in added_lines: + if not added_line.path.startswith(".github/workflows/"): + continue + match = first_named_match(added_line.text, config.workflow_private_patterns) + if match is None: + continue + findings.append( + Finding( + path=added_line.path, + line_number=added_line.line_number, + message=f"public workflow references {match.name}", + excerpt=added_line.text.strip(), + ) + ) + return tuple(findings) + + +def build_warnings(paths: Sequence[str], config: GuardConfig) -> tuple[str, ...]: + areas = top_level_areas(paths) + is_broad = len(paths) > config.review_warning_file_count + is_wide = len(areas) > config.review_warning_area_count + if not is_broad and not is_wide: + return () + warning = ( + f"{len(paths)} changed files across {len(areas)} top-level areas; " + "confirm this promotion is intentionally scoped." + ) + return (warning,) + + +def top_level_areas(paths: Sequence[str]) -> tuple[str, ...]: + areas = set() + for path in paths: + parts = Path(path).parts + areas.add(parts[0] if parts else path) + return tuple(sorted(areas)) + + +def render_report(report: Report) -> str: + lines = [ + f"check-public-promotion: {'FAIL' if report.failed else 'PASS'}", + "", + f"Base ref: {report.context.base_ref}", + f"Changed files: {len(report.context.changed_paths)}", + ] + append_path_section(lines, "Blocked paths", report.blocked_paths) + append_path_section(lines, "Allowlist misses", report.allowlist_misses) + append_finding_section(lines, "Private markers", report.marker_findings) + append_finding_section(lines, "Public invariants", report.invariant_findings) + append_warning_section(lines, report.warnings) + if report.failed: + append_suggested_actions(lines) + return "\n".join(lines) + + +def append_path_section(lines: list[str], title: str, entries: Sequence[str]) -> None: + if not entries: + return + lines.extend(["", f"{title}:"]) + lines.extend(f"- {entry}" for entry in entries) + + +def append_finding_section(lines: list[str], title: str, findings: Sequence[Finding]) -> None: + if not findings: + return + lines.extend(["", f"{title}:"]) + lines.extend(f"- {format_finding(finding)}" for finding in findings) + + +def append_warning_section(lines: list[str], warnings: Sequence[str]) -> None: + if not warnings: + return + lines.extend(["", "Warnings:"]) + lines.extend(f"- {warning}" for warning in warnings) + + +def append_suggested_actions(lines: list[str]) -> None: + lines.extend( + [ + "", + "Suggested action:", + "- split mixed commits into a narrower promotion branch", + "- remove blocked or non-allowlisted files from the promotion diff", + "- scrub private markers or private metadata and rerun the guard", + ] + ) + + +def format_finding(finding: Finding) -> str: + location = finding.path + if finding.line_number is not None: + location = f"{location}:{finding.line_number}" + if finding.excerpt: + return f"{location} [{finding.message}] {finding.excerpt}" + return f"{location} [{finding.message}]" + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/tests/test_check_public_promotion.py b/scripts/tests/test_check_public_promotion.py new file mode 100644 index 00000000..152fba06 --- /dev/null +++ b/scripts/tests/test_check_public_promotion.py @@ -0,0 +1,353 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import shutil +import subprocess +import sys +import tempfile +import unittest +from pathlib import Path + +SOURCE_ROOT = Path(__file__).resolve().parents[2] +GUARD_FILES = [ + "scripts/check-public-promotion", + "scripts/check-public-promotion.toml", + "scripts/check_public_promotion.py", +] +WORKFLOW_PREFIX = ( + "name: CI\n" + "on: [push]\n" + "jobs:\n" + " check:\n" + " runs-on: ubuntu-latest\n" + " steps:\n" + " - run: " +) + + +def workflow_file(command: str) -> str: + return WORKFLOW_PREFIX + command + "\n" + + +class CheckPublicPromotionTests(unittest.TestCase): + def setUp(self) -> None: + self.temp_dir = tempfile.TemporaryDirectory() + self.repo_root = Path(self.temp_dir.name) / "repo" + self.repo_root.mkdir() + + def tearDown(self) -> None: + self.temp_dir.cleanup() + + def test_blocked_private_path_fails(self) -> None: + repo = self.prepare_repo( + base_files={"engine/crates/fx-core/src/lib.rs": "pub fn base() {}\n"}, + changed_files={ + "app/Fawx/SecretView.swift": 'let token = "hidden"\n', + "engine/crates/fx-core/src/lib.rs": "pub fn base() {}\npub fn next() {}\n", + }, + ) + + result = self.run_guard(repo) + + self.assertNotEqual(result.returncode, 0) + self.assertIn("Blocked paths:", result.stdout) + self.assertIn("app/Fawx/SecretView.swift", result.stdout) + + def test_allowlist_miss_fails(self) -> None: + repo = self.prepare_repo( + base_files={"engine/crates/fx-core/src/lib.rs": "pub fn base() {}\n"}, + changed_files={"notes/private.md": "should not go public\n"}, + ) + + result = self.run_guard(repo) + + self.assertNotEqual(result.returncode, 0) + self.assertIn("Allowlist misses:", result.stdout) + self.assertIn("notes/private.md", result.stdout) + + def test_private_marker_in_added_line_fails(self) -> None: + repo = self.prepare_repo( + base_files={"engine/crates/fx-core/src/lib.rs": "pub fn base() {}\n"}, + changed_files={ + "engine/crates/fx-core/src/lib.rs": ( + "pub fn base() {}\n" + 'pub const REPO: &str = "https://github.com/abbudjoe/fawx";\n' + ) + }, + ) + + result = self.run_guard(repo) + + self.assertNotEqual(result.returncode, 0) + self.assertIn("Private markers:", result.stdout) + self.assertIn("private repo reference", result.stdout) + + def test_credential_token_marker_fails(self) -> None: + repo = self.prepare_repo( + base_files={"engine/crates/fx-core/src/lib.rs": "pub fn base() {}\n"}, + changed_files={ + "engine/crates/fx-core/src/lib.rs": ( + "pub fn base() {}\n" + 'pub const TOKEN: &str = "ghp_TESTTOKEN1234567890ABCD";\n' + ) + }, + ) + + result = self.run_guard(repo) + + self.assertNotEqual(result.returncode, 0) + self.assertIn("Private markers:", result.stdout) + self.assertIn("suspicious credential token", result.stdout) + + def test_tailscale_hostname_marker_fails(self) -> None: + repo = self.prepare_repo( + base_files={"engine/crates/fx-core/src/lib.rs": "pub fn base() {}\n"}, + changed_files={ + "engine/crates/fx-core/src/lib.rs": ( + "pub fn base() {}\n" + 'pub const HOST: &str = "relay.tail9696fb.ts.net";\n' + ) + }, + ) + + result = self.run_guard(repo) + + self.assertNotEqual(result.returncode, 0) + self.assertIn("Private markers:", result.stdout) + self.assertIn("Tailscale hostname", result.stdout) + + def test_tailscale_ipv4_marker_fails(self) -> None: + repo = self.prepare_repo( + base_files={"engine/crates/fx-core/src/lib.rs": "pub fn base() {}\n"}, + changed_files={ + "engine/crates/fx-core/src/lib.rs": ( + "pub fn base() {}\n" + 'pub const HOST: &str = "100.89.174.76";\n' + ) + }, + ) + + result = self.run_guard(repo) + + self.assertNotEqual(result.returncode, 0) + self.assertIn("Private markers:", result.stdout) + self.assertIn("Tailscale IPv4", result.stdout) + + def test_public_author_invariant_fails(self) -> None: + repo = self.prepare_repo( + base_files={"Cargo.toml": '[workspace.package]\nauthors = ["Fawx AI"]\n'}, + changed_files={"Cargo.toml": '[workspace.package]\nauthors = ["Joe"]\n'}, + ) + + result = self.run_guard(repo) + + self.assertNotEqual(result.returncode, 0) + self.assertIn("Public invariants:", result.stdout) + self.assertIn("author metadata should stay public-safe", result.stdout) + + def test_llama_reintroduction_fails_when_absent_from_base(self) -> None: + repo = self.prepare_repo( + base_files={ + "engine/crates/fx-core/Cargo.toml": ( + "[package]\n" + 'name = "fx-core"\n' + 'version = "0.1.0"\n' + ) + }, + changed_files={ + "engine/crates/fx-core/Cargo.toml": ( + "[package]\n" + 'name = "fx-core"\n' + 'version = "0.1.0"\n\n' + "[dependencies]\n" + 'llama-cpp-sys = "0.1"\n' + ) + }, + ) + + result = self.run_guard(repo) + + self.assertNotEqual(result.returncode, 0) + self.assertIn("Public invariants:", result.stdout) + self.assertIn("llama-cpp-sys is absent from the base ref", result.stdout) + + def test_workflow_private_ip_fails(self) -> None: + repo = self.prepare_repo( + base_files={}, + changed_files={".github/workflows/ci.yml": workflow_file("echo 10.1.2.3")}, + ) + + result = self.run_guard(repo) + + self.assertNotEqual(result.returncode, 0) + self.assertIn("Public invariants:", result.stdout) + self.assertIn("public workflow references internal IP address", result.stdout) + + def test_workflow_tailscale_endpoint_fails(self) -> None: + repo = self.prepare_repo( + base_files={}, + changed_files={ + ".github/workflows/ci.yml": workflow_file( + "echo wss://clawdio.tail9696fb.ts.net/socket" + ) + }, + ) + + result = self.run_guard(repo) + + self.assertNotEqual(result.returncode, 0) + self.assertIn("Public invariants:", result.stdout) + self.assertIn( + "public workflow references private host or tailnet endpoint", + result.stdout, + ) + + def test_public_workflow_websocket_url_passes(self) -> None: + repo = self.prepare_repo( + base_files={}, + changed_files={".github/workflows/ci.yml": workflow_file("echo wss://example.com/socket")}, + ) + + result = self.run_guard(repo) + + self.assertEqual(result.returncode, 0, result.stdout + result.stderr) + self.assertNotIn("Public invariants:", result.stdout) + + def test_workflow_localhost_ip_passes(self) -> None: + repo = self.prepare_repo( + base_files={}, + changed_files={".github/workflows/ci.yml": workflow_file("echo 127.0.0.1")}, + ) + + result = self.run_guard(repo) + + self.assertEqual(result.returncode, 0, result.stdout + result.stderr) + self.assertNotIn("Public invariants:", result.stdout) + + def test_broad_promotion_warns(self) -> None: + changed_files = { + f"engine/crates/fx-core/src/file_{index}.rs": f"pub fn item_{index}() {{}}\n" + for index in range(41) + } + repo = self.prepare_repo(base_files={}, changed_files=changed_files) + + result = self.run_guard(repo) + + self.assertEqual(result.returncode, 0, result.stdout + result.stderr) + self.assertIn("Warnings:", result.stdout) + self.assertIn("41 changed files across 1 top-level areas", result.stdout) + + def test_wide_promotion_warns(self) -> None: + repo = self.prepare_repo( + base_files={}, + changed_files={ + ".cargo/config.toml": "[build]\n", + "assets/logo.txt": "logo\n", + "bindings/public.h": "// header\n", + "engine/crates/fx-core/src/lib.rs": "pub fn public_change() {}\n", + "tui/src/main.rs": "fn main() {}\n", + }, + ) + + result = self.run_guard(repo) + + self.assertEqual(result.returncode, 0, result.stdout + result.stderr) + self.assertIn("Warnings:", result.stdout) + self.assertIn("5 changed files across 5 top-level areas", result.stdout) + + def test_missing_base_ref_fails_loudly(self) -> None: + repo = self.prepare_repo( + base_files={"engine/crates/fx-core/src/lib.rs": "pub fn base() {}\n"}, + changed_files={"engine/crates/fx-core/src/lib.rs": "pub fn base() {}\npub fn next() {}\n"}, + ) + config_path = repo / "scripts/check-public-promotion.toml" + config_text = config_path.read_text(encoding="utf-8") + config_path.write_text( + config_text.replace( + 'base_ref = "public/main"', + 'base_ref = "public/missing"', + 1, + ), + encoding="utf-8", + ) + + result = self.run_guard(repo) + + self.assertNotEqual(result.returncode, 0) + self.assertIn("check-public-promotion: FAIL", result.stdout) + self.assertIn("Base ref 'public/missing' is missing", result.stdout) + + def test_safe_promotion_passes(self) -> None: + repo = self.prepare_repo( + base_files={ + "Cargo.toml": '[workspace.package]\nauthors = ["Fawx AI"]\n', + "engine/crates/fx-core/src/lib.rs": "pub fn base() {}\n", + }, + changed_files={ + "engine/crates/fx-core/src/lib.rs": ( + "pub fn base() {}\n" + "pub fn public_change() -> &'static str {\n" + ' "ready"\n' + "}\n" + ) + }, + ) + + result = self.run_guard(repo) + + self.assertEqual(result.returncode, 0, result.stdout + result.stderr) + self.assertIn("check-public-promotion: PASS", result.stdout) + + def prepare_repo( + self, + base_files: dict[str, str], + changed_files: dict[str, str], + ) -> Path: + self.copy_guard_files(self.repo_root) + self.git("init") + self.git("config", "user.email", "tests@example.com") + self.git("config", "user.name", "Promotion Guard Tests") + self.write_files(base_files) + self.git("add", ".") + self.git("commit", "-m", "base") + self.git("branch", "public/main") + self.git("checkout", "-b", "promote/test", "public/main") + self.write_files(changed_files) + self.git("add", "-A") + self.git("commit", "-m", "candidate") + return self.repo_root + + def copy_guard_files(self, repo_root: Path) -> None: + for relative_path in GUARD_FILES: + source = SOURCE_ROOT / relative_path + target = repo_root / relative_path + target.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(source, target) + + def write_files(self, files: dict[str, str]) -> None: + for relative_path, content in files.items(): + target = self.repo_root / relative_path + target.parent.mkdir(parents=True, exist_ok=True) + target.write_text(content, encoding="utf-8") + + def git(self, *args: str) -> None: + subprocess.run( + ["git", *args], + cwd=self.repo_root, + text=True, + capture_output=True, + check=True, + ) + + def run_guard(self, repo_root: Path) -> subprocess.CompletedProcess[str]: + return subprocess.run( + [sys.executable, str(repo_root / "scripts/check-public-promotion")], + cwd=repo_root, + text=True, + capture_output=True, + check=False, + ) + + +if __name__ == "__main__": + unittest.main()