diff --git a/.github/actions/generate-build-matrix/generate_matrix.py b/.github/actions/generate-build-matrix/generate_matrix.py index 50b5c5c64..b6d5932ea 100644 --- a/.github/actions/generate-build-matrix/generate_matrix.py +++ b/.github/actions/generate-build-matrix/generate_matrix.py @@ -1,47 +1,78 @@ +"""Generates a build matrix for CI based on the trigger event and user input.""" import json import os import re + +def get_default_combinations(event_name, all_combinations): + """Gets the default build combinations based on the GitHub event type.""" + if event_name in ("push", "pull_request", "pull_request_target"): + return ["gcc/none"] + elif event_name == "issue_comment": + return ["gcc/none", "clang/none"] + elif event_name == "workflow_dispatch": + return all_combinations + else: + # Default to a minimal safe configuration for unknown events + return ["gcc/none"] + + def main(): + """Generates and outputs the build matrix based on environment variables.""" all_combinations = [ - "gcc/none", "gcc/asan", "gcc/tsan", "gcc/valgrind", - "clang/none", "clang/asan", "clang/tsan", "clang/valgrind" + "gcc/none", + "gcc/asan", + "gcc/tsan", + "gcc/valgrind", + "clang/none", + "clang/asan", + "clang/tsan", + "clang/valgrind", ] - default_excluded = ["clang/none", "clang/valgrind"] - user_input = os.getenv("USER_INPUT", "") comment_body = os.getenv("COMMENT_BODY", "") event_name = os.getenv("GITHUB_EVENT_NAME") - input_str = "" - if event_name == "workflow_dispatch": - input_str = user_input - elif event_name == "issue_comment": + default_combinations = get_default_combinations(event_name, all_combinations) + + input_str = user_input + if event_name == "issue_comment": match = re.match(r"^@phlexbot build\s*(.*)", comment_body) if match: - input_str = match.group(1) + input_str = match.group(1).strip() tokens = [token for token in re.split(r"[\s,]+", input_str) if token] if not tokens: - final_combinations = [combo for combo in all_combinations if combo not in default_excluded] + # Case 3: No input. Use the trigger-specific default. + final_combinations = default_combinations else: - is_additive = any(token == "all" or token.startswith("+") or token.startswith("-") for token in tokens) - - if is_additive: - base_set = set(all_combinations if "all" in tokens else [combo for combo in all_combinations if combo not in default_excluded]) + # Check for explicit (non-modifier) combinations + explicit_combos = { + t for t in tokens if not (t.startswith("+") or t.startswith("-") or t == "all") + } - for token in tokens: - if token.startswith("+"): - base_set.add(token[1:]) - elif token.startswith("-"): - base_set.discard(token[1:]) - final_combinations = list(base_set) + if explicit_combos: + # Case 1: Explicit list. This forms the base set, ignoring defaults. + base_set = explicit_combos else: - final_combinations = tokens + # Case 2: Only modifiers. Determine base set from 'all' or defaults. + if "all" in tokens: + base_set = set(all_combinations) + else: + base_set = set(default_combinations) + + # Apply modifiers to the determined base set + for token in tokens: + if token.startswith("+"): + base_set.add(token[1:]) + elif token.startswith("-"): + base_set.discard(token[1:]) + + final_combinations = list(base_set) matrix = {"include": []} - for combo in sorted(list(set(final_combinations))): + for combo in sorted(set(final_combinations)): compiler, sanitizer = combo.split("/") matrix["include"].append({"compiler": compiler, "sanitizer": sanitizer}) @@ -49,5 +80,6 @@ def main(): with open(os.environ["GITHUB_OUTPUT"], "a") as f: print(f"matrix={json_matrix}", file=f) + if __name__ == "__main__": main() diff --git a/scripts/check_codeql_alerts.py b/scripts/check_codeql_alerts.py index 2d9d85e98..f469941f7 100644 --- a/scripts/check_codeql_alerts.py +++ b/scripts/check_codeql_alerts.py @@ -61,6 +61,7 @@ def _api_request( _debug(f"GitHub API HTTPError {exc.code} for {url}: {body[:200]}") raise GitHubAPIError(f"GitHub API {method} {url} failed with {exc.code}: {body}") from exc + LEVEL_ORDER = {"none": 0, "note": 1, "warning": 2, "error": 3} LEVEL_ICONS = { "error": ":x:", @@ -73,6 +74,7 @@ def _api_request( @dataclass class Alert: """Represents a CodeQL alert extracted from SARIF.""" + number: int | None html_url: str | None rule_id: str @@ -184,6 +186,7 @@ def _debug(msg: str) -> None: _LOG_PATH: Path | None = None + def _log(msg: str) -> None: """Append a timestamped message to the persistent log file. @@ -221,7 +224,9 @@ def _init_log(log_path: Path) -> None: return -def _paginate_alerts_api(owner: str, repo: str, *, state: str = "open", ref: str | None = None) -> collections.abc.Iterator[dict]: +def _paginate_alerts_api( + owner: str, repo: str, *, state: str = "open", ref: str | None = None +) -> collections.abc.Iterator[dict]: """Paginate Code Scanning alerts from the GitHub API for given repo and ref.""" page = 1 while True: @@ -299,7 +304,12 @@ def _to_alert_api(raw: dict) -> Alert: # instance properties fallback if not security_severity: inst_props = instance.get("properties") or {} - for key in ("security-severity", "securitySeverity", "problem.severity", "problemSeverity"): + for key in ( + "security-severity", + "securitySeverity", + "problem.severity", + "problemSeverity", + ): if inst_props.get(key): security_severity = str(inst_props.get(key)) break @@ -314,22 +324,24 @@ def _to_alert_api(raw: dict) -> Alert: help_uri=help_uri, security_severity=security_severity, dismissed_reason=(str(dismissed_reason) if dismissed_reason is not None else None), - analysis_key=(instance.get("analysis_key") or raw.get("analysis_key") or raw.get("fingerprint")), + analysis_key=( + instance.get("analysis_key") or raw.get("analysis_key") or raw.get("fingerprint") + ), ) # If location couldn't be determined, log the raw API alert for inspection if location == "(location unavailable)": - try: - snippet = { - "number": raw.get("number"), - "rule": raw.get("rule"), - "most_recent_instance": raw.get("most_recent_instance"), - "instances": raw.get("instances"), - } - _log(f"Unknown API alert location: {json.dumps(snippet, default=str)[:4000]}") - except (TypeError, OSError) as exc: - # Best-effort logging failed; print a short message in debug mode. - if DEBUG: - print(f"Failed to write API unknown-location snippet: {exc}", file=sys.stderr) + try: + snippet = { + "number": raw.get("number"), + "rule": raw.get("rule"), + "most_recent_instance": raw.get("most_recent_instance"), + "instances": raw.get("instances"), + } + _log(f"Unknown API alert location: {json.dumps(snippet, default=str)[:4000]}") + except (TypeError, OSError) as exc: + # Best-effort logging failed; print a short message in debug mode. + if DEBUG: + print(f"Failed to write API unknown-location snippet: {exc}", file=sys.stderr) return alert @@ -414,7 +426,9 @@ def extract_location(result: dict[str, Any]) -> str: location_str = uri return location_str # Try relatedLocations as a fallback - related_locations: collections.abc.Iterable[dict[str, Any]] = result.get("relatedLocations") or [] + related_locations: collections.abc.Iterable[dict[str, Any]] = ( + result.get("relatedLocations") or [] + ) for location in related_locations: phys = location.get("physicalLocation") or {} artifact = phys.get("artifactLocation") or {} @@ -433,7 +447,9 @@ def extract_location(result: dict[str, Any]) -> str: location_str = uri return location_str - logical_locations: collections.abc.Iterable[dict[str, Any]] = result.get("logicalLocations") or [] + logical_locations: collections.abc.Iterable[dict[str, Any]] = ( + result.get("logicalLocations") or [] + ) for logical in logical_locations: fq_name = logical.get("fullyQualifiedName") or logical.get("name") if fq_name: @@ -523,10 +539,15 @@ def collect_alerts( "locations": result.get("locations"), "relatedLocations": result.get("relatedLocations"), } - _log(f"Unknown SARIF location for result: {json.dumps(snippet, default=str)[:4000]}") + _log( + f"Unknown SARIF location for result: {json.dumps(snippet, default=str)[:4000]}" + ) except (TypeError, OSError) as exc: if DEBUG: - print(f"Failed to write SARIF unknown-location snippet: {exc}", file=sys.stderr) + print( + f"Failed to write SARIF unknown-location snippet: {exc}", + file=sys.stderr, + ) buckets[baseline_state].append(alert) return buckets @@ -549,13 +570,17 @@ def _format_section( prefix = f"# {alert.number} " else: prefix = "" - dismissed_note = f" (dismissed: {alert.dismissed_reason})" if alert.dismissed_reason else "" + dismissed_note = ( + f" (dismissed: {alert.dismissed_reason})" if alert.dismissed_reason else "" + ) lines.append( f"- {bullet_prefix} **{alert.level_title()}**{severity_note} {prefix}{alert.rule_display()}{dismissed_note} at `{alert.location}` — {alert.message}" ) if remaining: - lines.append(f"- {bullet_prefix} …and {remaining} more alerts (see Code Scanning for the full list).") + lines.append( + f"- {bullet_prefix} …and {remaining} more alerts (see Code Scanning for the full list)." + ) return lines @@ -568,6 +593,7 @@ def build_comment( threshold: str, ) -> str: lines: list[str] = [] + def _highest_severity(alerts: collections.abc.Sequence[Alert]) -> str | None: if not alerts: return None @@ -588,7 +614,11 @@ def _highest_severity(alerts: collections.abc.Sequence[Alert]) -> str | None: lines.append( f"## ✅ {len(fixed_alerts)} CodeQL alert{'s' if len(fixed_alerts) != 1 else ''} resolved since the previous run" ) - lines.extend(_format_section(fixed_alerts, max_results=max_results, bullet_prefix=":white_check_mark:")) + lines.extend( + _format_section( + fixed_alerts, max_results=max_results, bullet_prefix=":white_check_mark:" + ) + ) lines.append("") if repo: @@ -651,10 +681,16 @@ def _print_summary( highest_matched = highest_severity_level_title(matched_alerts) if matched_alerts else None print("CodeQL Summary:") - print(f"- New (unfiltered): {len(new_alerts)}{f' (Highest: {highest_new})' if highest_new else ''}") - print(f"- Fixed (unfiltered): {len(fixed_alerts)}{f' (Highest: {highest_fixed})' if highest_fixed else ''}") + print( + f"- New (unfiltered): {len(new_alerts)}{f' (Highest: {highest_new})' if highest_new else ''}" + ) + print( + f"- Fixed (unfiltered): {len(fixed_alerts)}{f' (Highest: {highest_fixed})' if highest_fixed else ''}" + ) if matched_available: - print(f"- Matched (preexisting): {len(matched_alerts)}{f' (Highest: {highest_matched})' if highest_matched else ''}") + print( + f"- Matched (preexisting): {len(matched_alerts)}{f' (Highest: {highest_matched})' if highest_matched else ''}" + ) else: print("- Matched (preexisting): N/A (no API comparison)") print("") @@ -718,12 +754,16 @@ def _compare_alerts_via_api(owner: str, repo: str, ref: str) -> APIAlertComparis if base_ref or base_sha: # prefer base SHA if available base_target = base_sha or base_ref - base_alerts_raw = list(_paginate_alerts_api(owner, repo, state="open", ref=base_target)) + base_alerts_raw = list( + _paginate_alerts_api(owner, repo, state="open", ref=base_target) + ) _debug(f"Fetched {len(base_alerts_raw)} alerts for base {base_target}") prev_alerts_raw: list[dict] = [] if prev_commit_ref: - prev_alerts_raw = list(_paginate_alerts_api(owner, repo, state="open", ref=prev_commit_ref)) + prev_alerts_raw = list( + _paginate_alerts_api(owner, repo, state="open", ref=prev_commit_ref) + ) _debug(f"Fetched {len(prev_alerts_raw)} alerts for prev commit {prev_commit_ref}") def alert_key(a: Alert) -> tuple[str, str]: @@ -827,22 +867,42 @@ def _build_multi_section_comment( # Add previous-commit comparison if available if api_comp.new_vs_prev: - lines.append(f"## ❌ {len(api_comp.new_vs_prev)} new CodeQL alert{'s' if len(api_comp.new_vs_prev) != 1 else ''} since the previous PR commit") - lines.extend(_format_section(api_comp.new_vs_prev, max_results=max_results, bullet_prefix=":x:")) + lines.append( + f"## ❌ {len(api_comp.new_vs_prev)} new CodeQL alert{'s' if len(api_comp.new_vs_prev) != 1 else ''} since the previous PR commit" + ) + lines.extend( + _format_section(api_comp.new_vs_prev, max_results=max_results, bullet_prefix=":x:") + ) lines.append("") if api_comp.fixed_vs_prev: - lines.append(f"## ✅ {len(api_comp.fixed_vs_prev)} CodeQL alert{'s' if len(api_comp.fixed_vs_prev) != 1 else ''} resolved since the previous PR commit") - lines.extend(_format_section(api_comp.fixed_vs_prev, max_results=max_results, bullet_prefix=":white_check_mark:")) + lines.append( + f"## ✅ {len(api_comp.fixed_vs_prev)} CodeQL alert{'s' if len(api_comp.fixed_vs_prev) != 1 else ''} resolved since the previous PR commit" + ) + lines.extend( + _format_section( + api_comp.fixed_vs_prev, max_results=max_results, bullet_prefix=":white_check_mark:" + ) + ) lines.append("") # Add branch-point comparison if available if api_comp.new_vs_base: - lines.append(f"## ❌ {len(api_comp.new_vs_base)} new CodeQL alert{'s' if len(api_comp.new_vs_base) != 1 else ''} since the branch point") - lines.extend(_format_section(api_comp.new_vs_base, max_results=max_results, bullet_prefix=":x:")) + lines.append( + f"## ❌ {len(api_comp.new_vs_base)} new CodeQL alert{'s' if len(api_comp.new_vs_base) != 1 else ''} since the branch point" + ) + lines.extend( + _format_section(api_comp.new_vs_base, max_results=max_results, bullet_prefix=":x:") + ) lines.append("") if api_comp.fixed_vs_base: - lines.append(f"## ✅ {len(api_comp.fixed_vs_base)} CodeQL alert{'s' if len(api_comp.fixed_vs_base) != 1 else ''} resolved since the branch point") - lines.extend(_format_section(api_comp.fixed_vs_base, max_results=max_results, bullet_prefix=":white_check_mark:")) + lines.append( + f"## ✅ {len(api_comp.fixed_vs_base)} CodeQL alert{'s' if len(api_comp.fixed_vs_base) != 1 else ''} resolved since the branch point" + ) + lines.extend( + _format_section( + api_comp.fixed_vs_base, max_results=max_results, bullet_prefix=":white_check_mark:" + ) + ) lines.append("") repo_str = os.environ.get("GITHUB_REPOSITORY") @@ -884,7 +944,9 @@ def main(argv: collections.abc.Sequence[str] | None = None) -> int: if not owner or not repo: repo_full = os.environ.get("GITHUB_REPOSITORY") if not repo_full: - print("GITHUB_REPOSITORY not set; please provide --owner and --repo", file=sys.stderr) + print( + "GITHUB_REPOSITORY not set; please provide --owner and --repo", file=sys.stderr + ) return 2 owner, repo = repo_full.split("/", 1) try: @@ -905,12 +967,14 @@ def main(argv: collections.abc.Sequence[str] | None = None) -> int: else: # SARIF-only mode: re-collect without threshold to get unfiltered counts try: - buckets_all = collect_alerts(sarif, min_level='none') - new_all = buckets_all.get('new', []) - fixed_all = buckets_all.get('absent', []) + buckets_all = collect_alerts(sarif, min_level="none") + new_all = buckets_all.get("new", []) + fixed_all = buckets_all.get("absent", []) except (ValueError, KeyError, TypeError, json.JSONDecodeError) as exc: # If re-collection fails (malformed SARIF or unexpected structure), fall back to thresholded lists - _debug(f"Re-collecting SARIF without threshold failed: {exc}; falling back to thresholded lists") + _debug( + f"Re-collecting SARIF without threshold failed: {exc}; falling back to thresholded lists" + ) new_all = new_alerts fixed_all = fixed_alerts matched_all = [] @@ -924,7 +988,19 @@ def main(argv: collections.abc.Sequence[str] | None = None) -> int: matched_available=matched_available, ) - if new_alerts or fixed_alerts or (api_comp and (api_comp.new_vs_prev or api_comp.fixed_vs_prev or api_comp.new_vs_base or api_comp.fixed_vs_base)): + if ( + new_alerts + or fixed_alerts + or ( + api_comp + and ( + api_comp.new_vs_prev + or api_comp.fixed_vs_prev + or api_comp.new_vs_base + or api_comp.fixed_vs_base + ) + ) + ): repo_str = os.environ.get("GITHUB_REPOSITORY") comment_body = "" if api_comp: diff --git a/scripts/codeql_reset_dismissed_alerts.py b/scripts/codeql_reset_dismissed_alerts.py index 1a7fc3ef0..f5c5a8d7b 100644 --- a/scripts/codeql_reset_dismissed_alerts.py +++ b/scripts/codeql_reset_dismissed_alerts.py @@ -35,9 +35,7 @@ class GitHubAPIError(RuntimeError): def _token() -> str: token = os.environ.get("GITHUB_TOKEN") or os.environ.get("GH_TOKEN") if not token: - raise GitHubAPIError( - "Set GITHUB_TOKEN (or GH_TOKEN) with security_events:write scope." - ) + raise GitHubAPIError("Set GITHUB_TOKEN (or GH_TOKEN) with security_events:write scope.") return token @@ -97,6 +95,8 @@ def _paginate_alerts(owner: str, repo: str) -> Iterator[dict]: for alert in result: yield alert page += 1 + + @dataclass class Alert: number: int diff --git a/scripts/normalize_coverage_lcov.py b/scripts/normalize_coverage_lcov.py index eb469646b..36b33c5db 100644 --- a/scripts/normalize_coverage_lcov.py +++ b/scripts/normalize_coverage_lcov.py @@ -4,8 +4,8 @@ from __future__ import annotations import argparse -import sys import os +import sys from pathlib import Path @@ -23,7 +23,6 @@ def _relative_subpath(path: Path, base: Path | None) -> Path | None: Returns: The relative path if it exists within the base, otherwise None. """ - if base is None: return None @@ -64,7 +63,6 @@ def _is_repo_content(relative_path: Path) -> bool: Returns: True if the path should remain, False otherwise. """ - parts = relative_path.parts # Accept phlex/**, form/**, build-clang/**, .coverage-generated if len(parts) >= 1 and parts[0] in ("phlex", "form", "build-clang"): @@ -102,7 +100,6 @@ def normalize( resolved inside the repository and paths that live outside the repo, respectively. """ - repo_root = repo_root.absolute() repo_root_physical = repo_root.resolve() coverage_root_path = (coverage_root or repo_root).absolute() @@ -110,9 +107,7 @@ def normalize( coverage_root_relative = _relative_subpath(coverage_root_path, repo_root) if coverage_root_relative is None: - coverage_root_relative = _relative_subpath( - coverage_root_physical, repo_root_physical - ) + coverage_root_relative = _relative_subpath(coverage_root_physical, repo_root_physical) alias_symlink: Path | None = None alias_physical: Path | None = None @@ -122,9 +117,7 @@ def normalize( alias_physical = alias_symlink.resolve() alias_relative = _relative_subpath(alias_symlink, repo_root) if alias_relative is None: - alias_relative = _relative_subpath( - alias_physical, repo_root_physical - ) + alias_relative = _relative_subpath(alias_physical, repo_root_physical) missing: list[str] = [] external: list[str] = [] @@ -139,11 +132,7 @@ def flush_record(chunk: list[str]) -> None: return sf_index = next( - ( - idx - for idx, value in enumerate(chunk) - if value.startswith("SF:") - ), + (idx for idx, value in enumerate(chunk) if value.startswith("SF:")), None, ) if sf_index is None: @@ -199,15 +188,10 @@ def _add_candidate(path: Path) -> None: relative = subpath break - if ( - relative is None - and coverage_root_relative is not None - ): + if relative is None and coverage_root_relative is not None: subpath = _relative_subpath(candidate, coverage_root_path) if subpath is None: - subpath = _relative_subpath( - candidate, coverage_root_physical - ) + subpath = _relative_subpath(candidate, coverage_root_physical) if subpath is not None: relative = coverage_root_relative / subpath @@ -259,8 +243,7 @@ def parse_args(argv: list[str]) -> argparse.Namespace: """ parser = argparse.ArgumentParser( description=( - "Normalize LCOV coverage files so that source file paths match " - "the repository layout." + "Normalize LCOV coverage files so that source file paths match the repository layout." ) ) parser.add_argument( @@ -278,26 +261,18 @@ def parse_args(argv: list[str]) -> argparse.Namespace: "--coverage-root", type=Path, default=None, - help=( - "Root directory used when the report was generated (default: repo " - "root)" - ), + help=("Root directory used when the report was generated (default: repo root)"), ) parser.add_argument( "--coverage-alias", type=Path, default=None, - help=( - "Alternate path inside the workspace that should map to repo root" - ), + help=("Alternate path inside the workspace that should map to repo root"), ) parser.add_argument( "--absolute-paths", action="store_true", - help=( - "Emit absolute paths rooted at --repo-root instead of relative " - "paths" - ), + help=("Emit absolute paths rooted at --repo-root instead of relative paths"), ) return parser.parse_args(argv) @@ -314,12 +289,8 @@ def main(argv: list[str]) -> int: args = parse_args(argv) report = args.report.resolve() repo_root = args.repo_root.absolute() - coverage_root = ( - args.coverage_root.absolute() if args.coverage_root else repo_root - ) - coverage_alias = ( - args.coverage_alias.absolute() if args.coverage_alias else None - ) + coverage_root = args.coverage_root.absolute() if args.coverage_root else repo_root + coverage_alias = args.coverage_alias.absolute() if args.coverage_alias else None missing, external = normalize( report, @@ -343,9 +314,7 @@ def main(argv: list[str]) -> int: if missing: joined = "\n".join(f" - {path}" for path in missing) sys.stderr.write( - "LCOV report references files that do not exist inside the " - "repository:\n" - f"{joined}\n" + f"LCOV report references files that do not exist inside the repository:\n{joined}\n" ) return 1 diff --git a/scripts/normalize_coverage_xml.py b/scripts/normalize_coverage_xml.py index 035d300e6..cb1053748 100644 --- a/scripts/normalize_coverage_xml.py +++ b/scripts/normalize_coverage_xml.py @@ -4,10 +4,10 @@ from __future__ import annotations import argparse -import sys import os -from pathlib import Path +import sys import xml.etree.ElementTree as ET +from pathlib import Path def _relative_subpath(path: Path, base: Path | None) -> Path | None: @@ -25,7 +25,6 @@ def _relative_subpath(path: Path, base: Path | None) -> Path | None: Returns: The relative path if it exists within the base, otherwise None. """ - if base is None: return None @@ -76,7 +75,6 @@ def normalize( A tuple containing missing files (relative to repo_root) and external files that reside outside the repository. """ - tree = ET.parse(report_path) root = tree.getroot() @@ -110,15 +108,11 @@ def normalize( coverage_root_relative = _relative_subpath(coverage_root, repo_root) if coverage_root_relative is None: - coverage_root_relative = _relative_subpath( - coverage_root_resolved, repo_root_resolved - ) + coverage_root_relative = _relative_subpath(coverage_root_resolved, repo_root_resolved) repo_relative_from_coverage = _relative_subpath(repo_root, coverage_root) if repo_relative_from_coverage is None: - repo_relative_from_coverage = _relative_subpath( - repo_root_resolved, coverage_root_resolved - ) + repo_relative_from_coverage = _relative_subpath(repo_root_resolved, coverage_root_resolved) def _usable_prefix(prefix: Path | None) -> Path | None: if prefix is None: @@ -243,28 +237,19 @@ def parse_args(argv: list[str]) -> argparse.Namespace: parser.add_argument( "report", type=Path, - help=( - "Path to the Cobertura XML report " - "(e.g., phlex-build/coverage.xml)" - ), + help=("Path to the Cobertura XML report (e.g., phlex-build/coverage.xml)"), ) parser.add_argument( "--repo-root", type=Path, default=Path.cwd(), - help=( - "Repository root as seen by Codecov " - "(default: current directory)" - ), + help=("Repository root as seen by Codecov (default: current directory)"), ) parser.add_argument( "--coverage-root", type=Path, default=None, - help=( - "Root used when gcovr generated the report " - "(default: --repo-root)" - ), + help=("Root used when gcovr generated the report (default: --repo-root)"), ) parser.add_argument( "--coverage-alias", @@ -279,10 +264,7 @@ def parse_args(argv: list[str]) -> argparse.Namespace: "--source-dir", type=Path, default=None, - help=( - "Directory to store inside " - "(default: same as --repo-root)" - ), + help=("Directory to store inside (default: same as --repo-root)"), ) parser.add_argument( "--path-map", @@ -311,20 +293,14 @@ def main(argv: list[str]) -> int: args = parse_args(argv) report = args.report.resolve() repo_root = args.repo_root.absolute() - coverage_root = ( - args.coverage_root.absolute() if args.coverage_root else repo_root - ) - coverage_alias = ( - args.coverage_alias.absolute() if args.coverage_alias else None - ) + coverage_root = args.coverage_root.absolute() if args.coverage_root else repo_root + coverage_alias = args.coverage_alias.absolute() if args.coverage_alias else None source_dir = args.source_dir.absolute() if args.source_dir else repo_root path_maps: list[tuple[Path, Path]] = [] for mapping in args.path_map: if "=" not in mapping: - raise SystemExit( - f"Invalid --path-map '{mapping}'. Expected format FROM=TO." - ) + raise SystemExit(f"Invalid --path-map '{mapping}'. Expected format FROM=TO.") src_raw, dst_raw = mapping.split("=", 1) src_path = Path(src_raw) if not src_path.is_absolute(): @@ -361,9 +337,7 @@ def main(argv: list[str]) -> int: if missing: joined = "\n".join(f" - {name}" for name in missing) sys.stderr.write( - "Coverage XML references files that do not exist in the " - "repository:\n" - f"{joined}\n" + f"Coverage XML references files that do not exist in the repository:\n{joined}\n" ) return 1 diff --git a/scripts/sarif-alerts.py b/scripts/sarif-alerts.py index fb90e9a85..151e1eda9 100644 --- a/scripts/sarif-alerts.py +++ b/scripts/sarif-alerts.py @@ -26,7 +26,9 @@ def _process_sarif(path: Path) -> Iterable[str]: def main(argv=None) -> int: - parser = argparse.ArgumentParser(description="Print SARIF results from one or more SARIF files") + parser = argparse.ArgumentParser( + description="Print SARIF results from one or more SARIF files" + ) parser.add_argument("files", nargs="+", help="Path(s) to SARIF file(s) to inspect") args = parser.parse_args(argv) diff --git a/test/python/adder.py b/test/python/adder.py index 190b49c1a..a37b2c136 100644 --- a/test/python/adder.py +++ b/test/python/adder.py @@ -6,4 +6,3 @@ }""") add = cppyy.gbl.test.add - diff --git a/test/python/pyphlex.py b/test/python/pyphlex.py index 04028565c..99a82acd2 100644 --- a/test/python/pyphlex.py +++ b/test/python/pyphlex.py @@ -1,5 +1,4 @@ -""" -Minimal access to Phlex from cppyy. +"""Minimal access to Phlex from cppyy. cppyy needs access to core Phlex classes to complete the registration, which is done through header parsing. Only the relevant header files are included @@ -7,16 +6,17 @@ executable, which is already appropriately linked. """ -import cppyy import os +import cppyy + # locate phlex through the environment PHLEX_INSTALL = os.environ["PHLEX_INSTALL"] # load phlex headers includedir = os.path.join(PHLEX_INSTALL, "include") if not os.path.exists(includedir): - includedir = PHLEX_INSTALL + includedir = PHLEX_INSTALL cppyy.add_include_path(includedir) cppyy.include("phlex/core/framework_graph.hpp") cppyy.include("phlex/configuration.hpp") @@ -24,13 +24,14 @@ # open phlex namespace for convenience in JITed C++ snippets cppyy.cppdef("using namespace phlex::experimental;") + # `with` is a keyword in Python, so can not be the name of a method; fix this # by renaming it to `with_` for all phlex classes def fixwith(klass, name): try: - klass.with_ = getattr(klass, 'with') + klass.with_ = getattr(klass, "with") except AttributeError: pass # nothing to fix -cppyy.py.add_pythonization(fixwith, "phlex::experimental") +cppyy.py.add_pythonization(fixwith, "phlex::experimental") diff --git a/test/python/registry.py b/test/python/registry.py index 757d04438..6d6db51bc 100644 --- a/test/python/registry.py +++ b/test/python/registry.py @@ -1,7 +1,7 @@ import cppyy import pyphlex # noqa: F401 -__all__ = ['pyphlex'] +__all__ = ["pyphlex"] cpp = cppyy.gbl phlex = cpp.phlex.experimental @@ -9,8 +9,9 @@ _registered_modules = dict() + def register(m, config): - config = cppyy.bind_object(config, 'phlex::experimental::configuration') + config = cppyy.bind_object(config, "phlex::experimental::configuration") pymod_name = str(config.get["std::string"]("pymodule")) pyalg_name = str(config.get["std::string"]("pyalg")) @@ -25,6 +26,5 @@ def register(m, config): pyalg = getattr(pymod, pyalg_name) - graph = cppyy.bind_object(m, 'phlex::experimental::graph_proxy') - graph.with_(pyalg_name, pyalg, phlex.concurrency.serial).transform(*inputs).to(*outputs); - + graph = cppyy.bind_object(m, "phlex::experimental::graph_proxy") + graph.with_(pyalg_name, pyalg, phlex.concurrency.serial).transform(*inputs).to(*outputs) diff --git a/test/python/test_phlex.py b/test/python/test_phlex.py index 769b44911..52d964e28 100644 --- a/test/python/test_phlex.py +++ b/test/python/test_phlex.py @@ -1,15 +1,12 @@ - - class TestPYPHLEX: - @classmethod def setup_class(cls): import pyphlex # noqa: F401 - __all__ = ['pyphlex'] # For CodeQL + + __all__ = ["pyphlex"] # For CodeQL def test01_phlex_existence(self): """Test existence of the phlex namespace""" - import cppyy assert cppyy.gbl.phlex