diff --git a/.github/workflows/flakiness_detector.yml b/.github/workflows/flakiness_detector.yml new file mode 100644 index 00000000..4108da57 --- /dev/null +++ b/.github/workflows/flakiness_detector.yml @@ -0,0 +1,183 @@ +name: Flakiness Detector + +# Runs after any CI workflow completes and can also be triggered manually for testing. +on: + workflow_run: + workflows: ["PR Validation", "Test Data Display"] + types: [completed] + workflow_dispatch: + inputs: + workflow_run_id: + description: 'Workflow run ID to analyse (required for manual trigger)' + required: true + type: string + repo: + description: 'Repository in owner/repo format' + required: false + default: 'OWASP-BLT/BLT-Leaf' + pr_number: + description: 'PR number (optional)' + required: false + type: string + +permissions: + issues: write # create / reopen / comment on flaky-test issues + pull-requests: write # post flakiness summary comment on PRs + actions: write # trigger rerun-failed-jobs + +jobs: + detect: + name: Detect & Record Flakiness + runs-on: ubuntu-latest + + # Skip runs triggered by this workflow's own bot commits to prevent loops + if: > + github.event_name == 'workflow_dispatch' || + (github.event.workflow_run.conclusion != null && + !contains(github.event.workflow_run.head_commit.message, '[skip ci]')) + + steps: + # ----------------------------------------------------------------------- + # 1. Checkout + # ----------------------------------------------------------------------- + - name: Checkout repository + uses: actions/checkout@v4 + + # ----------------------------------------------------------------------- + # 2. Python + dependencies + # ----------------------------------------------------------------------- + - name: Set up Python 3.12 + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Install Python dependencies + run: pip install --quiet requests pyyaml + + # ----------------------------------------------------------------------- + # 3. Resolve run metadata (works for both trigger types) + # ----------------------------------------------------------------------- + - name: Resolve workflow run metadata + id: meta + run: | + if [ "${{ github.event_name }}" = "workflow_dispatch" ]; then + echo "run_id=${{ github.event.inputs.workflow_run_id }}" >> "$GITHUB_OUTPUT" + echo "repo=${{ github.event.inputs.repo }}" >> "$GITHUB_OUTPUT" + echo "pr_number=${{ github.event.inputs.pr_number }}" >> "$GITHUB_OUTPUT" + else + echo "run_id=${{ github.event.workflow_run.id }}" >> "$GITHUB_OUTPUT" + echo "repo=${{ github.repository }}" >> "$GITHUB_OUTPUT" + # Extract PR number from workflow_run context (empty string if not a PR run) + PR=$(echo '${{ toJson(github.event.workflow_run.pull_requests) }}' \ + | python3 -c "import json,sys; prs=json.load(sys.stdin); print(prs[0]['number'] if prs else '')") + echo "pr_number=${PR}" >> "$GITHUB_OUTPUT" + fi + + # ----------------------------------------------------------------------- + # 4. Collect CI results → writes ci_run_history rows to D1, outputs failed jobs + # ----------------------------------------------------------------------- + - name: Collect CI results + id: collect + env: + CLOUDFLARE_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }} + CLOUDFLARE_D1_DATABASE_ID: ${{ secrets.CLOUDFLARE_D1_DATABASE_ID }} + CLOUDFLARE_API_TOKEN: ${{ secrets.CLOUDFLARE_API_TOKEN }} + run: | + python scripts/flakiness/collect_ci_results.py \ + --workflow-run-id "${{ steps.meta.outputs.run_id }}" \ + --repo "${{ steps.meta.outputs.repo }}" \ + --github-token "${{ secrets.GITHUB_TOKEN }}" \ + ${{ steps.meta.outputs.pr_number != '' && format('--pr-number {0}', steps.meta.outputs.pr_number) || '' }} \ + | tee /tmp/collect_output.json + + # Expose whether any test failures were found + FAILED=$(python3 -c " + import json, sys + d = json.load(open('/tmp/collect_output.json')) + print('true' if d.get('failed_jobs') else 'false') + ") + echo "has_failures=${FAILED}" >> "$GITHUB_OUTPUT" + + # ----------------------------------------------------------------------- + # 5. Retry failed jobs (only when there are first-attempt failures) + # ----------------------------------------------------------------------- + - name: Retry failed jobs + id: retry + if: steps.collect.outputs.has_failures == 'true' + env: + CLOUDFLARE_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }} + CLOUDFLARE_D1_DATABASE_ID: ${{ secrets.CLOUDFLARE_D1_DATABASE_ID }} + CLOUDFLARE_API_TOKEN: ${{ secrets.CLOUDFLARE_API_TOKEN }} + run: | + python scripts/flakiness/retry_failures.py \ + --workflow-run-id "${{ steps.meta.outputs.run_id }}" \ + --repo "${{ steps.meta.outputs.repo }}" \ + --github-token "${{ secrets.GITHUB_TOKEN }}" \ + --collect-output /tmp/collect_output.json \ + | tee /tmp/retry_output.json + + # ----------------------------------------------------------------------- + # 6. Re-collect after retry so flake_confirmed rows are recorded in D1 + # ----------------------------------------------------------------------- + - name: Re-collect after retry + if: steps.collect.outputs.has_failures == 'true' + env: + CLOUDFLARE_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }} + CLOUDFLARE_D1_DATABASE_ID: ${{ secrets.CLOUDFLARE_D1_DATABASE_ID }} + CLOUDFLARE_API_TOKEN: ${{ secrets.CLOUDFLARE_API_TOKEN }} + run: | + python scripts/flakiness/collect_ci_results.py \ + --workflow-run-id "${{ steps.meta.outputs.run_id }}" \ + --repo "${{ steps.meta.outputs.repo }}" \ + --github-token "${{ secrets.GITHUB_TOKEN }}" \ + ${{ steps.meta.outputs.pr_number != '' && format('--pr-number {0}', steps.meta.outputs.pr_number) || '' }} \ + > /dev/null + + # ----------------------------------------------------------------------- + # 7. Analyse flakiness → upserts flakiness_scores in D1, outputs classification + # ----------------------------------------------------------------------- + - name: Analyse flakiness + id: analyze + env: + CLOUDFLARE_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }} + CLOUDFLARE_D1_DATABASE_ID: ${{ secrets.CLOUDFLARE_D1_DATABASE_ID }} + CLOUDFLARE_API_TOKEN: ${{ secrets.CLOUDFLARE_API_TOKEN }} + run: | + python scripts/flakiness/analyze_flakiness.py \ + --repo "${{ steps.meta.outputs.repo }}" \ + | tee /tmp/flaky_report.json + + FLAKY_COUNT=$(python3 -c " + import json + d = json.load(open('/tmp/flaky_report.json')) + print(len(d.get('flaky', []))) + ") + echo "flaky_count=${FLAKY_COUNT}" >> "$GITHUB_OUTPUT" + + # ----------------------------------------------------------------------- + # 8. Report flakiness to GitHub Issues / PR comment, write local files + # ----------------------------------------------------------------------- + - name: Report flakiness + env: + CLOUDFLARE_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }} + CLOUDFLARE_D1_DATABASE_ID: ${{ secrets.CLOUDFLARE_D1_DATABASE_ID }} + CLOUDFLARE_API_TOKEN: ${{ secrets.CLOUDFLARE_API_TOKEN }} + run: | + python scripts/flakiness/report_flakiness.py \ + --repo "${{ steps.meta.outputs.repo }}" \ + --github-token "${{ secrets.GITHUB_TOKEN }}" \ + ${{ steps.meta.outputs.pr_number != '' && format('--pr-number {0}', steps.meta.outputs.pr_number) || '' }} \ + --flaky-report /tmp/flaky_report.json + + # ----------------------------------------------------------------------- + # 9. Upload report artifacts (flakiness_report.md + flakiness_metrics.json) + # ----------------------------------------------------------------------- + - name: Upload flakiness artifacts + if: always() + uses: actions/upload-artifact@v4 + with: + name: flakiness-report + path: | + data/flakiness_report.md + data/flakiness_metrics.json + if-no-files-found: ignore diff --git a/.gitignore b/.gitignore index 9e1da51e..d12d0e83 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,7 @@ dist/ *.pyc __pycache__/ .pytest_cache/ + +# Flakiness pipeline artifacts +data/flakiness_report.md +data/flakiness_metrics.json diff --git a/migrations/0004_create_flakiness_tables.sql b/migrations/0004_create_flakiness_tables.sql new file mode 100644 index 00000000..40a3fe1f --- /dev/null +++ b/migrations/0004_create_flakiness_tables.sql @@ -0,0 +1,64 @@ +-- Migration: Create flakiness detection tables +-- Created: 2026-03-11 +-- Description: Add tables for CI run history, flakiness scores, and known infrastructure issue patterns + +-- Per-run record for every CI job execution +CREATE TABLE IF NOT EXISTS ci_run_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + check_name TEXT NOT NULL, -- name of the CI check / job step + job_name TEXT NOT NULL, -- GitHub Actions job name + workflow_name TEXT NOT NULL, -- workflow display name (e.g. "CI") + workflow_run_id INTEGER NOT NULL, -- github.run_id + run_attempt INTEGER NOT NULL DEFAULT 1, -- 1 = first run, 2+ = retry + status TEXT NOT NULL, -- 'pass' | 'fail' | 'skip' + conclusion_category TEXT NOT NULL, -- 'test_failure' | 'infrastructure' | 'flake_confirmed' | 'pass' | 'skip' + commit_sha TEXT NOT NULL, + pr_number INTEGER, -- NULL when not a PR-triggered run + repo TEXT NOT NULL, -- "owner/repo" format + timestamp TEXT NOT NULL DEFAULT (datetime('now')) +); + +CREATE INDEX IF NOT EXISTS idx_ci_run_history_lookup + ON ci_run_history(check_name, job_name, repo, timestamp); + +-- Computed flakiness scores per (repo, workflow_name, check_name, job_name) tuple +CREATE TABLE IF NOT EXISTS flakiness_scores ( + repo TEXT NOT NULL, -- "owner/repo" format + workflow_name TEXT NOT NULL, + check_name TEXT NOT NULL, + job_name TEXT NOT NULL, + flakiness_score REAL NOT NULL DEFAULT 0.0, -- 0.0 – 1.0 + severity TEXT NOT NULL DEFAULT 'stable', -- 'stable' | 'low' | 'medium' | 'high' | 'deterministic' + classification TEXT NOT NULL DEFAULT 'stable', -- 'stable' | 'flaky' | 'deterministic' + total_runs INTEGER NOT NULL DEFAULT 0, + failure_count INTEGER NOT NULL DEFAULT 0, + flaky_failures INTEGER NOT NULL DEFAULT 0, -- failures that passed on re-run + consecutive_failures INTEGER NOT NULL DEFAULT 0, -- current streak of consecutive failures + last_updated TEXT NOT NULL DEFAULT (datetime('now')), + PRIMARY KEY (repo, workflow_name, check_name, job_name) +); + +CREATE INDEX IF NOT EXISTS idx_flakiness_scores_repo + ON flakiness_scores(repo); +CREATE INDEX IF NOT EXISTS idx_flakiness_scores_lookup + ON flakiness_scores(repo, workflow_name, check_name, job_name); + +-- Seed patterns used to classify infrastructure failures separately from test flakiness +CREATE TABLE IF NOT EXISTS known_infrastructure_issues ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + pattern TEXT NOT NULL UNIQUE, + category TEXT NOT NULL DEFAULT 'infrastructure', + description TEXT, + created_at TEXT NOT NULL DEFAULT (datetime('now')) +); +INSERT OR IGNORE INTO known_infrastructure_issues (pattern, category, description) VALUES + ('ECONNRESET', 'infrastructure', 'TCP connection reset — transient network issue'), + ('timed_out', 'infrastructure', 'GitHub Actions step conclusion: timed_out'), + ('timeout', 'infrastructure', 'Generic timeout — network or infrastructure issue'), + ('rate limit', 'infrastructure', 'API or package registry rate limit hit'), + ('ETIMEDOUT', 'infrastructure', 'TCP connection timed out'), + ('fetch failed', 'infrastructure', 'Network fetch failure — transient'), + ('network error', 'infrastructure', 'Generic network error'), + ('Could not resolve host', 'infrastructure', 'DNS resolution failure'), + ('dependency', 'infrastructure', 'Dependency installation or resolution failure'), + ('upstream', 'infrastructure', 'Upstream service or dependency issue'); \ No newline at end of file diff --git a/package.json b/package.json index 9459b7f5..7c0f40ca 100644 --- a/package.json +++ b/package.json @@ -12,7 +12,8 @@ "lint": "echo 'Linting not configured yet' && exit 0", "format:check": "echo 'Format checking not configured yet' && exit 0", "test": "node test-data-display.js", - "test:data-display": "node test-data-display.js" + "test:data-display": "node test-data-display.js", + "test:flakiness": "python -m unittest discover -s scripts/flakiness/tests -p \"test_*.py\" -v" }, "keywords": [ "github", diff --git a/scripts/flakiness/analyze_flakiness.py b/scripts/flakiness/analyze_flakiness.py new file mode 100644 index 00000000..7e3fdd96 --- /dev/null +++ b/scripts/flakiness/analyze_flakiness.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python3 +""" +Analyze CI run history to detect flaky tests and compute flakiness scores. + +Reads from ci_run_history in Cloudflare D1, upserts results into +flakiness_scores, and outputs a JSON report to stdout: + { + "flaky": [{"check_name": ..., "flakiness_score": ..., "severity": ..., ...}], + "deterministic": [...], + "stable": [...] + } + +Usage: + python analyze_flakiness.py --repo owner/repo [--check-name "job name"] +""" + +import argparse +import json +import os +import sys + +sys.path.insert(0, os.path.dirname(__file__)) +from db_utils import get_d1_credentials, d1_query, d1_select, load_config + + +def _get_severity(score, config): + """Map a flakiness score (0–1) to a severity label using configured bands.""" + sev = config.get('severity', {}) + low_band = sev.get('low', [0.10, 0.20]) + medium_band = sev.get('medium', [0.20, 0.40]) + + if score < low_band[0]: + return 'stable' + if score < medium_band[0]: + return 'low' + if score < medium_band[1]: + return 'medium' + return 'high' + + +def analyze_check(rows, config): + """ + Classify a single (check_name, job_name) pair given its ordered run history. + + Returns a dict with classification, score, severity, and stats, + or None if there are insufficient runs. + + Classification rules (in priority order): + 1. < min_runs total non-infra runs → None (skip) + 2. Last N consecutive failures → deterministic + 3. failure_rate > flaky_max_rate → deterministic + 4. flaky_min_rate <= failure_rate <= max → flaky + 5. Otherwise → stable + """ + thresholds = config.get('thresholds', {}) + window = thresholds.get('window_size', 20) + min_runs = thresholds.get('min_runs', 5) + flaky_min = thresholds.get('flaky_min_rate', 0.10) + flaky_max = thresholds.get('flaky_max_rate', 0.60) + consec_det = thresholds.get('consecutive_failures_deterministic', 3) + + # Only count non-infrastructure runs toward the analysis window + relevant = [r for r in rows if r['conclusion_category'] != 'infrastructure'] + # Take the most recent `window` runs + window_rows = relevant[-window:] + total = len(window_rows) + + if total < min_runs: + return None + + failures = [r for r in window_rows if r['status'] == 'fail'] + flaky_confs = [r for r in window_rows if r['conclusion_category'] == 'flake_confirmed'] + failure_count = len(failures) + flaky_count = len(flaky_confs) + failure_rate = failure_count / total + + # Count consecutive failures from the most recent end (leading streak) + consecutive = 0 + for r in reversed(window_rows): + if r['status'] == 'fail' and r['conclusion_category'] == 'test_failure': + consecutive += 1 + else: + break + + # Classify + if consecutive >= consec_det: + classification = 'deterministic' + elif failure_rate > flaky_max: + classification = 'deterministic' + elif failure_rate >= flaky_min and flaky_count > 0: + classification = 'flaky' + else: + classification = 'stable' + + # Flakiness score: proportion of failures in the window + # (only meaningful for flaky checks; set to 0 for stable/deterministic) + if classification == 'flaky': + flakiness_score = round(failure_rate, 4) + severity = _get_severity(flakiness_score, config) + else: + flakiness_score = 0.0 + severity = classification # 'stable' or 'deterministic' + + return { + 'classification': classification, + 'flakiness_score': flakiness_score, + 'severity': severity, + 'total_runs': total, + 'failure_count': failure_count, + 'flaky_failures': flaky_count, + 'consecutive_failures': consecutive, + } + + +def main(): + parser = argparse.ArgumentParser(description='Analyze flakiness from CI run history') + parser.add_argument('--repo', required=True, help='owner/repo') + parser.add_argument('--check-name', default=None, + help='Limit analysis to a single check name') + parser.add_argument('--dry-run', action='store_true', + help='Use synthetic 20-run history, skip all D1 calls') + args = parser.parse_args() + + config = load_config() + report = {'flaky': [], 'deterministic': [], 'stable': []} + + if args.dry_run: + print('[dry-run] Dry run enabled — using synthetic 20-run history, skipping D1', + file=sys.stderr) + + def _p(): + return {'status': 'pass', 'conclusion_category': 'pass'} + + def _f(): + return {'status': 'fail', 'conclusion_category': 'test_failure'} + + # test-suite: 17 pass + 3 fail (scattered) → 15% → flaky/low + # build: 20 pass → 0% → stable + # integration-tests: 18 pass + 2 fail (scattered) → 10% → flaky/low + synthetic = [ + { + 'check_name': 'test-suite', 'job_name': 'test-suite', + 'workflow_name': 'PR Validation', + 'history': ( + [_p()] * 6 + [_f()] + [_p()] * 4 + [_f()] + [_p()] * 5 + [_f()] + [_p()] * 2 + ), + }, + { + 'check_name': 'build', 'job_name': 'build', + 'workflow_name': 'PR Validation', + 'history': [_p()] * 20, + }, + { + 'check_name': 'integration-tests', 'job_name': 'integration-tests', + 'workflow_name': 'PR Validation', + 'history': [_p()] * 9 + [_f()] + [_p()] * 9 + [_f()], + }, + ] + + for combo in synthetic: + if args.check_name and combo['check_name'] != args.check_name: + continue + check_name = combo['check_name'] + job_name = combo['job_name'] + workflow_name = combo['workflow_name'] + history = combo['history'] + + print(f'[dry-run] Analyzing {len(history)} runs for {check_name!r}', + file=sys.stderr) + result = analyze_check(history, config) + if result is None: + print(f'[dry-run] → insufficient data (need ≥5 runs)', file=sys.stderr) + continue + + score_pct = f'{result["flakiness_score"]:.0%}' if result['flakiness_score'] else '0%' + print( + f'[dry-run] {check_name} \u2192 {result["classification"]} ' + f'(score={score_pct}, severity={result["severity"]})', + file=sys.stderr, + ) + print(f'[dry-run] Skipping D1 upsert for {check_name!r}', file=sys.stderr) + + entry = {'repo': args.repo, 'check_name': check_name, + 'job_name': job_name, 'workflow_name': workflow_name, + **result} + report[result['classification']].append(entry) + + n_flaky = len(report['flaky']) + n_det = len(report['deterministic']) + n_stab = len(report['stable']) + total = n_flaky + n_det + n_stab + print( + f'[dry-run] Flakiness score computed for {total} checks ' + f'({n_flaky} flaky, {n_det} deterministic, {n_stab} stable)', + file=sys.stderr, + ) + + else: + account_id, db_id, token = get_d1_credentials() + + base_sql = """ + SELECT DISTINCT check_name, job_name, workflow_name + FROM ci_run_history + WHERE repo = ? + """ + params = [args.repo] + if args.check_name: + base_sql += ' AND check_name = ?' + params.append(args.check_name) + + combos = d1_select(account_id, db_id, token, base_sql, params) + + for combo in combos: + check_name = combo['check_name'] + job_name = combo['job_name'] + workflow_name = combo['workflow_name'] + + history = d1_select( + account_id, db_id, token, + """ + SELECT status, conclusion_category + FROM ci_run_history + WHERE repo = ? AND check_name = ? AND job_name = ? AND workflow_name = ? + ORDER BY timestamp ASC + """, + [args.repo, check_name, job_name, workflow_name], + ) + + result = analyze_check(history, config) + if result is None: + continue + + d1_query( + account_id, db_id, token, + """ + INSERT INTO flakiness_scores + (repo, workflow_name, check_name, job_name, flakiness_score, severity, + classification, total_runs, failure_count, flaky_failures, + consecutive_failures, last_updated) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) + ON CONFLICT(repo, workflow_name, check_name, job_name) DO UPDATE SET + repo = excluded.repo, + workflow_name = excluded.workflow_name, + flakiness_score = excluded.flakiness_score, + severity = excluded.severity, + classification = excluded.classification, + total_runs = excluded.total_runs, + failure_count = excluded.failure_count, + flaky_failures = excluded.flaky_failures, + consecutive_failures = excluded.consecutive_failures, + last_updated = excluded.last_updated + """, + [ + args.repo, workflow_name, check_name, job_name, + result['flakiness_score'], result['severity'], + result['classification'], result['total_runs'], + result['failure_count'], result['flaky_failures'], + result['consecutive_failures'], + ], + ) + + entry = { + 'repo': args.repo, + 'check_name': check_name, + 'job_name': job_name, + 'workflow_name': workflow_name, + **result, + } + report[result['classification']].append(entry) + + print(json.dumps(report, indent=2)) + return 0 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/scripts/flakiness/collect_ci_results.py b/scripts/flakiness/collect_ci_results.py new file mode 100644 index 00000000..ff17bb0c --- /dev/null +++ b/scripts/flakiness/collect_ci_results.py @@ -0,0 +1,218 @@ +#!/usr/bin/env python3 +""" +Collect CI job results from a GitHub Actions workflow run and write them to +Cloudflare D1 via the REST API. + +Outputs JSON to stdout: + { + "failed_jobs": ["job_name", ...], + "run_attempt": 1, + "workflow_name": "CI", + "all_jobs": [{"check_name": ..., "status": ..., "conclusion_category": ...}, ...] + } + +Usage: + python collect_ci_results.py \\ + --workflow-run-id 12345678 \\ + --repo owner/repo \\ + --github-token ghp_... \\ + [--commit-sha abc123] \\ + [--pr-number 42] \\ + [--dry-run] +""" + +import argparse +import json +import os +import sys + +import requests + +sys.path.insert(0, os.path.dirname(__file__)) +from db_utils import get_d1_credentials, get_infra_patterns, load_config, d1_query + +GITHUB_API = 'https://api.github.com' + + +def _gh_headers(token): + headers = { + 'Accept': 'application/vnd.github+json', + 'User-Agent': 'BLT-Leaf-Flakiness/1.0', + 'X-GitHub-Api-Version': '2022-11-28', + } + if token: + headers['Authorization'] = f'Bearer {token}' + return headers + + +def fetch_run_meta(owner, repo, run_id, token): + url = f'{GITHUB_API}/repos/{owner}/{repo}/actions/runs/{run_id}' + resp = requests.get(url, headers=_gh_headers(token), timeout=30) + resp.raise_for_status() + return resp.json() + + +def fetch_jobs(owner, repo, run_id, token): + """Fetch all jobs for a run, following pagination.""" + url = f'{GITHUB_API}/repos/{owner}/{repo}/actions/runs/{run_id}/jobs' + jobs = [] + while url: + resp = requests.get(url, headers=_gh_headers(token), timeout=30) + resp.raise_for_status() + data = resp.json() + jobs.extend(data.get('jobs', [])) + url = resp.links.get('next', {}).get('url') + return jobs + + +def classify_conclusion(job, infra_patterns): + """ + Map a job's conclusion to a conclusion_category: + 'pass' | 'skip' | 'infrastructure' | 'test_failure' + """ + conclusion = (job.get('conclusion') or '').lower() + + if conclusion in ('skipped', 'cancelled', 'neutral'): + return 'skip' + + if conclusion == 'success': + return 'pass' + + # timed_out is always infrastructure regardless of logs + if conclusion == 'timed_out': + return 'infrastructure' + + if conclusion == 'failure': + # Build a text blob from job name + all step names/conclusions for pattern matching + step_text = ' '.join( + f"{s.get('name', '')} {s.get('conclusion') or ''}" + for s in job.get('steps', []) + ).lower() + full_text = f"{job.get('name', '').lower()} {conclusion} {step_text}" + + for pattern in infra_patterns: + if pattern in full_text: + return 'infrastructure' + return 'test_failure' + + # unknown conclusions (e.g. 'action_required') → treat as pass + return 'pass' + + +def main(): + parser = argparse.ArgumentParser(description='Collect CI results for a workflow run') + parser.add_argument('--workflow-run-id', required=True, type=int) + parser.add_argument('--repo', required=True, help='owner/repo') + parser.add_argument('--github-token', default=os.environ.get('GITHUB_TOKEN')) + parser.add_argument('--commit-sha', default='') + parser.add_argument('--pr-number', type=int, default=None) + parser.add_argument('--dry-run', action='store_true', + help='Parse and print results without writing to DB') + args = parser.parse_args() + + owner, repo_name = args.repo.split('/', 1) + + if args.dry_run: + print('[dry-run] Dry run enabled — using synthetic data, skipping all external calls', + file=sys.stderr) + config = load_config() + infra_patterns = ['econnreset', 'timed_out', 'timeout', 'rate limit', + 'etimedout', 'fetch failed', 'network error'] + run_attempt = 1 + workflow_name = 'PR Validation' + commit_sha = args.commit_sha or 'abc123deadbeef' + jobs = [ + {'name': 'test-suite', 'conclusion': 'failure', + 'steps': [{'name': 'Run tests', 'conclusion': 'failure'}]}, + {'name': 'build', 'conclusion': 'success', 'steps': []}, + {'name': 'lint', 'conclusion': 'success', 'steps': []}, + {'name': 'type-check', 'conclusion': 'success', 'steps': []}, + {'name': 'integration-tests', 'conclusion': 'success', 'steps': []}, + {'name': 'deploy-preview', 'conclusion': 'success', 'steps': []}, + ] + else: + account_id, db_id, token = get_d1_credentials() + infra_patterns = get_infra_patterns(account_id, db_id, token) + config = load_config() + run_meta = fetch_run_meta(owner, repo_name, args.workflow_run_id, args.github_token) + run_attempt = run_meta.get('run_attempt', 1) + workflow_name = run_meta.get('name', 'unknown') + commit_sha = args.commit_sha or run_meta.get('head_sha', '') + jobs = fetch_jobs(owner, repo_name, args.workflow_run_id, args.github_token) + + rows = [] + failed_jobs = [] + + for job in jobs: + conclusion_category = classify_conclusion(job, infra_patterns) + if conclusion_category in ('pass',): + status = 'pass' + elif conclusion_category == 'skip': + status = 'skip' + else: + status = 'fail' + + row = { + 'check_name': job['name'], + 'job_name': job['name'], + 'workflow_name': workflow_name, + 'workflow_run_id': args.workflow_run_id, + 'run_attempt': run_attempt, + 'status': status, + 'conclusion_category': conclusion_category, + 'commit_sha': commit_sha, + 'pr_number': args.pr_number, + 'repo': args.repo, + } + rows.append(row) + + if status == 'fail' and conclusion_category == 'test_failure': + failed_jobs.append(job['name']) + + if not args.dry_run: + insert_sql = """ + INSERT INTO ci_run_history + (check_name, job_name, workflow_name, workflow_run_id, run_attempt, + status, conclusion_category, commit_sha, pr_number, repo) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """ + for row in rows: + d1_query( + account_id, db_id, token, + insert_sql, + [ + row['check_name'], row['job_name'], row['workflow_name'], + row['workflow_run_id'], row['run_attempt'], row['status'], + row['conclusion_category'], row['commit_sha'], + row['pr_number'], row['repo'], + ], + ) + + # Prune history older than configured retention window + prune_days = config.get('github', {}).get('history_prune_days', 90) + d1_query( + account_id, db_id, token, + "DELETE FROM ci_run_history WHERE timestamp < datetime('now', ?)", + [f'-{prune_days} days'], + ) + else: + print(f'[dry-run] Collected {len(rows)} CI jobs ' + f'({len(failed_jobs)} test failure(s)) — skipping D1 writes', + file=sys.stderr) + for r in rows: + icon = '\u2713' if r['status'] == 'pass' else '\u2717' + print(f'[dry-run] {icon} {r["job_name"]:30s} {r["conclusion_category"]}', + file=sys.stderr) + + output = { + 'failed_jobs': failed_jobs, + 'run_attempt': run_attempt, + 'workflow_name': workflow_name, + 'all_jobs': rows, + } + print(json.dumps(output)) + return 0 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/scripts/flakiness/db_utils.py b/scripts/flakiness/db_utils.py new file mode 100644 index 00000000..527024c6 --- /dev/null +++ b/scripts/flakiness/db_utils.py @@ -0,0 +1,89 @@ +"""Cloudflare D1 REST API client for the flakiness detection pipeline.""" + +import os + +import requests +import yaml + +_SCRIPTS_DIR = os.path.dirname(__file__) +CONFIG_PATH = os.path.normpath(os.path.join(_SCRIPTS_DIR, 'flakiness_config.yml')) + +_config_cache = None + + +def load_config(): + """Load and cache flakiness_config.yml.""" + global _config_cache + if _config_cache is None: + with open(CONFIG_PATH, encoding='utf-8') as f: + _config_cache = yaml.safe_load(f) + return _config_cache + + +def get_d1_credentials(): + """ + Read D1 credentials from environment variables. + Raises RuntimeError with a clear message if any are missing. + """ + account_id = os.environ.get('CLOUDFLARE_ACCOUNT_ID') + db_id = os.environ.get('CLOUDFLARE_D1_DATABASE_ID') + token = os.environ.get('CLOUDFLARE_API_TOKEN') + missing = [k for k, v in { + 'CLOUDFLARE_ACCOUNT_ID': account_id, + 'CLOUDFLARE_D1_DATABASE_ID': db_id, + 'CLOUDFLARE_API_TOKEN': token, + }.items() if not v] + if missing: + raise RuntimeError( + f"Missing required environment variables: {', '.join(missing)}" + ) + return account_id, db_id, token + + +def d1_query(account_id, db_id, token, sql, params=None): + """ + Execute a SQL statement against Cloudflare D1 via the REST API. + Returns the raw result list from the API response. + Raises RuntimeError if the API reports failure. + """ + url = ( + f'https://api.cloudflare.com/client/v4/accounts/{account_id}' + f'/d1/database/{db_id}/query' + ) + body = {'sql': sql} + if params is not None: + body['params'] = params + resp = requests.post( + url, + headers={ + 'Authorization': f'Bearer {token}', + 'Content-Type': 'application/json', + }, + json=body, + timeout=30, + ) + resp.raise_for_status() + data = resp.json() + if not data.get('success'): + raise RuntimeError(f'D1 query failed: {data.get("errors")}') + return data.get('result', []) + + +def d1_select(account_id, db_id, token, sql, params=None): + """ + Execute a SELECT and return a list of row dicts. + Convenience wrapper around d1_query(). + """ + result = d1_query(account_id, db_id, token, sql, params) + if not result: + return [] + return result[0].get('results', []) + + +def get_infra_patterns(account_id, db_id, token): + """Return a list of lowercase infrastructure pattern strings from D1.""" + rows = d1_select( + account_id, db_id, token, + 'SELECT pattern FROM known_infrastructure_issues', + ) + return [row['pattern'].lower() for row in rows] diff --git a/scripts/flakiness/flakiness_config.yml b/scripts/flakiness/flakiness_config.yml new file mode 100644 index 00000000..9924a4de --- /dev/null +++ b/scripts/flakiness/flakiness_config.yml @@ -0,0 +1,33 @@ +thresholds: + window_size: 20 # number of recent runs to analyze + min_runs: 5 # minimum runs required before scoring + flaky_min_rate: 0.10 # failure rate lower bound for flaky classification + flaky_max_rate: 0.60 # failure rate upper bound; above this = deterministic + consecutive_failures_deterministic: 3 # N consecutive failures = deterministic failure + +severity: + low: [0.10, 0.20] + medium: [0.20, 0.40] + high: [0.40, 1.00] + +infrastructure_patterns: + - "ECONNRESET" + - "timed_out" + - "timeout" + - "rate limit" + - "ETIMEDOUT" + - "fetch failed" + - "network error" + - "Could not resolve host" + - "npm ERR!" + - "pip install failed" + +labels: + flaky_test: "flaky-test" + infrastructure: "ci-instability" + +github: + issue_title_prefix: "[Flaky Test]" + pr_comment_on_flake: true + block_merge_on_flake: false + history_prune_days: 90 # rows older than this are pruned from ci_run_history diff --git a/scripts/flakiness/report_flakiness.py b/scripts/flakiness/report_flakiness.py new file mode 100644 index 00000000..fd9c66fd --- /dev/null +++ b/scripts/flakiness/report_flakiness.py @@ -0,0 +1,395 @@ +#!/usr/bin/env python3 +""" +Report flakiness results to GitHub and write local report files. + +Actions performed: + 1. For each newly flaky check: create / reopen / update a GitHub Issue + labelled 'flaky-test' and 'ci-instability'. + 2. If --pr-number is given: post a PR comment summarising flaky checks. + 3. Write data/flakiness_report.md — human-readable summary. + 4. Write data/flakiness_metrics.json — machine-readable metrics. + +Reads the flaky report JSON from --flaky-report file or stdin +(output of analyze_flakiness.py). + +Usage: + python report_flakiness.py \\ + --repo owner/repo \\ + --github-token ghp_... \\ + [--pr-number 42] \\ + [--flaky-report /tmp/flaky_report.json] \\ + [--no-github] +""" + +import argparse +import json +import os +import sys +from datetime import datetime, timezone + +import requests + +sys.path.insert(0, os.path.dirname(__file__)) +from db_utils import get_d1_credentials, d1_select, load_config + +GITHUB_API = 'https://api.github.com' +_REPO_ROOT = os.path.normpath(os.path.join(os.path.dirname(__file__), '..', '..')) +DATA_DIR = os.path.join(_REPO_ROOT, 'data') + + +def _gh_headers(token): + headers = { + 'Accept': 'application/vnd.github+json', + 'User-Agent': 'BLT-Leaf-Flakiness/1.0', + 'X-GitHub-Api-Version': '2022-11-28', + } + if token: + headers['Authorization'] = f'Bearer {token}' + return headers + + +# --------------------------------------------------------------------------- +# GitHub Issue helpers +# --------------------------------------------------------------------------- + +def _check_identity(check_name, job_name, workflow_name): + check_name = check_name or 'unknown' + job_name = job_name or 'unknown' + workflow_name = workflow_name or 'unknown' + return f'{workflow_name} / {job_name} / {check_name}' + + +def _issue_title(check_name, job_name, workflow_name, prefix): + return f'{prefix} {_check_identity(check_name, job_name, workflow_name)}' + + +def search_flaky_issue(owner, repo, check_name, job_name, workflow_name, token, prefix): + """Return the first matching GitHub issue or None.""" + title = _issue_title(check_name, job_name, workflow_name, prefix) + query = f'"{title}" repo:{owner}/{repo} is:issue' + resp = requests.get( + f'{GITHUB_API}/search/issues', + params={'q': query, 'per_page': 5}, + headers=_gh_headers(token), + timeout=30, + ) + if resp.status_code != 200: + return None + items = resp.json().get('items', []) + return items[0] if items else None + + +def create_issue(owner, repo, entry, token, config, labels): + check_name = entry['check_name'] + job_name = entry.get('job_name') + workflow_name = entry.get('workflow_name') + prefix = config.get('github', {}).get('issue_title_prefix', '[Flaky Test]') + resp = requests.post( + f'{GITHUB_API}/repos/{owner}/{repo}/issues', + headers=_gh_headers(token), + json={ + 'title': _issue_title(check_name, job_name, workflow_name, prefix), + 'body': _build_issue_body(entry), + 'labels': labels, + }, + timeout=30, + ) + resp.raise_for_status() + return resp.json() + + +def add_issue_comment(owner, repo, issue_number, entry, token): + resp = requests.post( + f'{GITHUB_API}/repos/{owner}/{repo}/issues/{issue_number}/comments', + headers=_gh_headers(token), + json={'body': _build_issue_body(entry)}, + timeout=30, + ) + resp.raise_for_status() + + +def reopen_issue(owner, repo, issue_number, token): + resp = requests.patch( + f'{GITHUB_API}/repos/{owner}/{repo}/issues/{issue_number}', + headers=_gh_headers(token), + json={'state': 'open'}, + timeout=30, + ) + resp.raise_for_status() + + +def _build_issue_body(entry): + check_name = entry.get('check_name', 'unknown') + job_name = entry.get('job_name', 'unknown') + workflow_name = entry.get('workflow_name', 'unknown') + score = entry.get('flakiness_score', 0.0) + severity = entry.get('severity', 'unknown') + classification = entry.get('classification', 'unknown') + total = entry.get('total_runs', 0) + failures = entry.get('failure_count', 0) + flaky = entry.get('flaky_failures', 0) + failure_rate = f'{failures / total:.1%}' if total else 'N/A' + now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC') + + return f"""\ +## Flaky CI Check: `{_check_identity(check_name, job_name, workflow_name)}` + +**Detected:** {now} +**Workflow:** `{workflow_name}` | **Job:** `{job_name}` +**Classification:** `{classification}` | **Severity:** `{severity}` +**Flakiness Score:** {score:.2%} + +### Statistics (last analysis window) + +| Metric | Value | +|--------|-------| +| Total runs analysed | {total} | +| Failures | {failures} | +| Confirmed flakes (pass on retry) | {flaky} | +| Failure rate | {failure_rate} | + +### What this means +This check is failing **intermittently** without a deterministic code change. +It will **not block merges** while classified as flaky. + +### Next steps +- [ ] Investigate the root cause of intermittent failures +- [ ] If environment-dependent, add retry logic inside the test +- [ ] If infrastructure-related, add the log pattern to `known_infrastructure_issues` + +--- +*Automatically managed by [BLT-Leaf Flakiness Detector](../../scripts/flakiness/).* +""" + + +# --------------------------------------------------------------------------- +# PR comment +# --------------------------------------------------------------------------- + +def post_pr_comment(owner, repo, pr_number, flaky_entries, token): + if not flaky_entries: + return + rows = '\n'.join( + f'| `{_check_identity(e.get("check_name"), e.get("job_name"), e.get("workflow_name"))}` ' + f'| {e["severity"]} | {e["flakiness_score"]:.2%} ' + f'| {e["failure_count"]}/{e["total_runs"]} |' + for e in flaky_entries + ) + body = ( + '## :test_tube: Flaky Tests Detected\n\n' + 'The following CI checks were classified as **flaky** in this run. ' + 'They will **not block merge**.\n\n' + '| Workflow / Job / Check | Severity | Flakiness Score | Failures / Runs |\n' + '|------------------------|----------|----------------|------------------|\n' + f'{rows}\n\n' + '> Flaky checks fail intermittently without a code regression. ' + 'Maintainers have been notified via GitHub Issues.' + ) + resp = requests.post( + f'{GITHUB_API}/repos/{owner}/{repo}/issues/{pr_number}/comments', + headers=_gh_headers(token), + json={'body': body}, + timeout=30, + ) + resp.raise_for_status() + + +# --------------------------------------------------------------------------- +# Report file builders +# --------------------------------------------------------------------------- + +def _build_markdown_report(all_scores, repo): + now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC') + flaky = [s for s in all_scores if s['classification'] == 'flaky'] + deterministic = [s for s in all_scores if s['classification'] == 'deterministic'] + stable = [s for s in all_scores if s['classification'] == 'stable'] + + flaky_sorted = sorted(flaky, key=lambda x: x['flakiness_score'], reverse=True) + flaky_rows = '\n'.join( + f'| `{e["workflow_name"]}` | `{e["job_name"]}` | `{e["check_name"]}` | {e["severity"]} ' + f'| {e["flakiness_score"]:.2%} | {e["failure_count"]}/{e["total_runs"]} ' + f'| {str(e.get("last_updated", ""))[:10]} |' + for e in flaky_sorted[:20] + ) or '_No flaky tests detected._' + + det_rows = '\n'.join( + f'| `{e["workflow_name"]}` | `{e["job_name"]}` | `{e["check_name"]}` ' + f'| {e["consecutive_failures"]} consecutive |' + for e in deterministic[:10] + ) or '_None._' + + return f"""\ +# Flakiness Report — {repo} + +Generated: {now} + +## Summary + +| Category | Count | +|----------|-------| +| Flaky | {len(flaky)} | +| Deterministic failures | {len(deterministic)} | +| Stable | {len(stable)} | + +## Top Flaky Checks + +| Workflow | Job | Check | Severity | Score | Failures / Runs | Last Updated | +|----------|-----|-------|----------|-------|-----------------|--------------| +{flaky_rows} + +## Deterministic Failures + +| Workflow | Job | Check | Evidence | +|----------|-----|-------|----------| +{det_rows} + +--- +*Managed by BLT-Leaf Flakiness Detector. \ +Thresholds: [`scripts/flakiness/flakiness_config.yml`](../scripts/flakiness/flakiness_config.yml)* +""" + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser( + description='Report flakiness to GitHub and write local report files' + ) + parser.add_argument('--repo', required=True, help='owner/repo') + parser.add_argument('--github-token', default=os.environ.get('GITHUB_TOKEN')) + parser.add_argument('--pr-number', type=int, default=None) + parser.add_argument('--flaky-report', default=None, + help='Path to JSON from analyze_flakiness.py; reads stdin if omitted') + parser.add_argument('--no-github', action='store_true', + help='Skip all GitHub API calls (useful for local testing)') + parser.add_argument('--dry-run', action='store_true', + help='Skip D1 reads and all GitHub calls; write report files only') + args = parser.parse_args() + + config = load_config() + + if '/' not in args.repo: + print(f"Error: --repo must be in 'owner/repo' format, got '{args.repo}'", file=sys.stderr) + return 1 + owner, repo_name = args.repo.split('/', 1) + + if not args.dry_run: + account_id, db_id, token = get_d1_credentials() + + # Load flaky report + if args.flaky_report: + with open(args.flaky_report, encoding='utf-8') as fh: + report = json.load(fh) + elif not args.dry_run: + report = json.loads(sys.stdin.read()) + else: + # dry-run with no file: use synthetic data + print('[dry-run] No --flaky-report supplied; using synthetic data', file=sys.stderr) + report = { + 'flaky': [ + {'check_name': 'test-suite', 'job_name': 'test-suite', + 'workflow_name': 'PR Validation', 'classification': 'flaky', + 'flakiness_score': 0.15, 'severity': 'low', + 'total_runs': 20, 'failure_count': 3, 'flaky_failures': 1, + 'consecutive_failures': 0}, + {'check_name': 'integration-tests', 'job_name': 'integration-tests', + 'workflow_name': 'PR Validation', 'classification': 'flaky', + 'flakiness_score': 0.10, 'severity': 'low', + 'total_runs': 20, 'failure_count': 2, 'flaky_failures': 1, + 'consecutive_failures': 0}, + ], + 'deterministic': [], + 'stable': [ + {'check_name': 'build', 'job_name': 'build', + 'workflow_name': 'PR Validation', 'classification': 'stable', + 'flakiness_score': 0.0, 'severity': 'stable', + 'total_runs': 20, 'failure_count': 0, 'flaky_failures': 0, + 'consecutive_failures': 0}, + ], + } + + flaky_entries = report.get('flaky', []) + label_flaky = config.get('labels', {}).get('flaky_test', 'flaky-test') + label_infra = config.get('labels', {}).get('infrastructure', 'ci-instability') + prefix = config.get('github', {}).get('issue_title_prefix', '[Flaky Test]') + + # --- GitHub Issue automation --- + if not args.no_github and not args.dry_run and args.github_token: + for entry in flaky_entries: + check_name = entry['check_name'] + job_name = entry.get('job_name') + workflow_name = entry.get('workflow_name') + identity = _check_identity(check_name, job_name, workflow_name) + existing = search_flaky_issue( + owner, repo_name, check_name, job_name, workflow_name, + args.github_token, prefix + ) + if existing is None: + create_issue(owner, repo_name, entry, args.github_token, + config, [label_flaky, label_infra]) + print(f'[report] Created issue for: {identity}', file=sys.stderr) + elif existing.get('state') == 'closed': + reopen_issue(owner, repo_name, existing['number'], args.github_token) + add_issue_comment(owner, repo_name, existing['number'], + entry, args.github_token) + print(f'[report] Reopened issue #{existing["number"]} for: {identity}', + file=sys.stderr) + else: + add_issue_comment(owner, repo_name, existing['number'], + entry, args.github_token) + print(f'[report] Updated issue #{existing["number"]} for: {identity}', + file=sys.stderr) + + # --- PR comment --- + if (args.pr_number and flaky_entries + and config.get('github', {}).get('pr_comment_on_flake', True)): + post_pr_comment( + owner, repo_name, args.pr_number, flaky_entries, args.github_token + ) + print(f'[report] Posted PR comment on #{args.pr_number}', file=sys.stderr) + + # --- Write local report files --- + if args.dry_run: + print('[dry-run] Dry run enabled — skipping D1 reads, building report from input', + file=sys.stderr) + all_scores = ( + report.get('flaky', []) + + report.get('deterministic', []) + + report.get('stable', []) + ) + else: + all_scores = d1_select( + account_id, db_id, token, + 'SELECT * FROM flakiness_scores WHERE repo = ? ORDER BY flakiness_score DESC', + [args.repo], + ) + + os.makedirs(DATA_DIR, exist_ok=True) + + report_path = os.path.join(DATA_DIR, 'flakiness_report.md') + with open(report_path, 'w', encoding='utf-8') as fh: + fh.write(_build_markdown_report(all_scores, args.repo)) + print(f'[report] Wrote {report_path}', file=sys.stderr) + + metrics = { + 'generated_at': datetime.now(timezone.utc).isoformat(), + 'repo': args.repo, + 'summary': { + 'flaky': len([s for s in all_scores if s['classification'] == 'flaky']), + 'deterministic': len([s for s in all_scores if s['classification'] == 'deterministic']), + 'stable': len([s for s in all_scores if s['classification'] == 'stable']), + }, + 'scores': all_scores, + } + metrics_path = os.path.join(DATA_DIR, 'flakiness_metrics.json') + with open(metrics_path, 'w', encoding='utf-8') as fh: + json.dump(metrics, fh, indent=2) + print(f'[report] Wrote {metrics_path}', file=sys.stderr) + + return 0 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/scripts/flakiness/retry_failures.py b/scripts/flakiness/retry_failures.py new file mode 100644 index 00000000..9ef9c81c --- /dev/null +++ b/scripts/flakiness/retry_failures.py @@ -0,0 +1,230 @@ +#!/usr/bin/env python3 +""" +Actively retry failed CI jobs when the current run is the first attempt. + +- If run_attempt > 1: skip re-triggering (already retried), report as-is. +- If run_attempt == 1: call rerun-failed-jobs, poll for completion, then + check which previously-failed jobs now passed and mark them as flake_confirmed. + +Reads collect_ci_results.py output from --collect-output file or stdin. + +Outputs JSON to stdout: + { + "job_name": "confirmed_flake" | "real_failure" | "skipped_already_retried" | + "rerun_not_permitted" | "poll_timeout", + ... + } + +Usage: + python retry_failures.py \\ + --workflow-run-id 12345678 \\ + --repo owner/repo \\ + --github-token ghp_... \\ + --collect-output /tmp/collect_output.json +""" + +import argparse +import json +import os +import sys +import time + +import requests + +sys.path.insert(0, os.path.dirname(__file__)) +from db_utils import get_d1_credentials, d1_query + +GITHUB_API = 'https://api.github.com' +POLL_INTERVAL_S = 30 # seconds between status polls +MAX_POLL_S = 600 # 10-minute maximum wait + + +def _gh_headers(token): + headers = { + 'Accept': 'application/vnd.github+json', + 'User-Agent': 'BLT-Leaf-Flakiness/1.0', + 'X-GitHub-Api-Version': '2022-11-28', + } + if token: + headers['Authorization'] = f'Bearer {token}' + return headers + + +def trigger_rerun(owner, repo, run_id, token): + """POST rerun-failed-jobs. Returns True if triggered, False if not permitted.""" + url = f'{GITHUB_API}/repos/{owner}/{repo}/actions/runs/{run_id}/rerun-failed-jobs' + resp = requests.post(url, headers=_gh_headers(token), timeout=30) + if resp.status_code == 403: + print('[retry] No permission to trigger rerun (403). Skipping.', file=sys.stderr) + return False + resp.raise_for_status() + return True + + +def poll_run_completion(owner, repo, run_id, token): + """ + Poll the run status until it completes or timeout is reached. + Returns (conclusion, new_run_attempt) or (None, None) on timeout. + """ + url = f'{GITHUB_API}/repos/{owner}/{repo}/actions/runs/{run_id}' + deadline = time.monotonic() + MAX_POLL_S + + while time.monotonic() < deadline: + resp = requests.get(url, headers=_gh_headers(token), timeout=30) + resp.raise_for_status() + data = resp.json() + if data.get('status') == 'completed': + return data.get('conclusion'), data.get('run_attempt', 1) + print(f'[retry] Run {run_id} status={data.get("status")!r}, ' + f'waiting {POLL_INTERVAL_S}s…', file=sys.stderr) + time.sleep(POLL_INTERVAL_S) + + print(f'[retry] Timed out waiting for run {run_id}.', file=sys.stderr) + return None, None + + +def fetch_job_conclusions(owner, repo, run_id, token): + """Return {job_name: conclusion} for the latest attempt of each job.""" + url = f'{GITHUB_API}/repos/{owner}/{repo}/actions/runs/{run_id}/jobs' + conclusions = {} + while url: + resp = requests.get(url, headers=_gh_headers(token), timeout=30) + resp.raise_for_status() + data = resp.json() + for job in data.get('jobs', []): + conclusions[job['name']] = job.get('conclusion', 'unknown') + url = resp.links.get('next', {}).get('url') + return conclusions + + +def mark_flake_confirmed(account_id, db_id, token, workflow_run_id, job_name, repo, new_attempt): + """ + Insert a flake_confirmed row by copying the original failure row's metadata + and recording the rerun as a passing attempt. + """ + d1_query( + account_id, db_id, token, + """ + INSERT INTO ci_run_history + (check_name, job_name, workflow_name, workflow_run_id, run_attempt, + status, conclusion_category, commit_sha, pr_number, repo) + SELECT check_name, job_name, workflow_name, ?, ?, + 'pass', 'flake_confirmed', commit_sha, pr_number, repo + FROM ci_run_history + WHERE workflow_run_id = ? AND job_name = ? AND repo = ? + ORDER BY id DESC + LIMIT 1 + """, + [workflow_run_id, new_attempt, workflow_run_id, job_name, repo], + ) + + +def main(): + parser = argparse.ArgumentParser( + description='Retry failed CI jobs and classify results as flake or real failure' + ) + parser.add_argument('--workflow-run-id', required=True, type=int) + parser.add_argument('--repo', required=True, help='owner/repo') + parser.add_argument('--github-token', default=os.environ.get('GITHUB_TOKEN')) + parser.add_argument('--collect-output', default=None, + help='Path to JSON file from collect_ci_results.py; ' + 'reads stdin if omitted') + parser.add_argument('--dry-run', action='store_true', + help='Simulate retry without any GitHub or D1 calls') + args = parser.parse_args() + + if args.collect_output: + with open(args.collect_output, encoding='utf-8-sig') as fh: + collect_data = json.load(fh) + else: + collect_data = json.loads(sys.stdin.read()) + + failed_jobs = collect_data.get('failed_jobs', []) + run_attempt = collect_data.get('run_attempt', 1) + result = {} + + if not failed_jobs: + print(json.dumps(result)) + return 0 + + # --- Dry-run: simulate the retry without touching GitHub or D1 --- + if args.dry_run: + print('[dry-run] Dry run enabled — simulating retry, skipping GitHub/D1 calls', + file=sys.stderr) + if run_attempt > 1: + print(f'[dry-run] run_attempt={run_attempt} — already retried, skipping.', + file=sys.stderr) + for job in failed_jobs: + result[job] = 'skipped_already_retried' + else: + print(f'[dry-run] Simulating rerun-failed-jobs for run ' + f'{args.workflow_run_id}…', file=sys.stderr) + for job in failed_jobs: + print(f'[retry] Retrying failed job: {job}', file=sys.stderr) + print('[retry] Simulated retry result: success', file=sys.stderr) + result[job] = 'confirmed_flake' + print(f'[retry] Marked as flaky: {job!r} (passed on retry — ' + 'skipping D1 write)', file=sys.stderr) + n_flakes = len([v for v in result.values() if v == 'confirmed_flake']) + print(f'[dry-run] {n_flakes} flaky job(s) confirmed', file=sys.stderr) + print(json.dumps(result)) + return 0 + + # If this is already a retry run, skip re-triggering + if run_attempt > 1: + print(f'[retry] run_attempt={run_attempt} — already retried. ' + 'Skipping re-trigger.', file=sys.stderr) + for job in failed_jobs: + result[job] = 'skipped_already_retried' + print(json.dumps(result)) + return 0 + + if '/' not in args.repo: + print(f"Error: --repo must be in 'owner/repo' format, got '{args.repo}'", file=sys.stderr) + for job in failed_jobs: + result[job] = 'malformed_repo_arg' + print(json.dumps(result)) + return 1 + owner, repo_name = args.repo.split('/', 1) + account_id, db_id, token = get_d1_credentials() + + print(f'[retry] Triggering rerun-failed-jobs for run ' + f'{args.workflow_run_id}…', file=sys.stderr) + triggered = trigger_rerun(owner, repo_name, args.workflow_run_id, args.github_token) + + if not triggered: + for job in failed_jobs: + result[job] = 'rerun_not_permitted' + print(json.dumps(result)) + return 0 + + _, new_attempt = poll_run_completion( + owner, repo_name, args.workflow_run_id, args.github_token + ) + + if new_attempt is None: + for job in failed_jobs: + result[job] = 'poll_timeout' + print(json.dumps(result)) + return 0 + + # Compare per-job outcomes from the rerun + job_conclusions = fetch_job_conclusions( + owner, repo_name, args.workflow_run_id, args.github_token + ) + + for job in failed_jobs: + rerun_conclusion = job_conclusions.get(job, 'unknown') + if rerun_conclusion == 'success': + result[job] = 'confirmed_flake' + mark_flake_confirmed(account_id, db_id, token, args.workflow_run_id, job, args.repo, new_attempt) + print(f'[retry] {job!r}: confirmed flake (passed on retry)', file=sys.stderr) + else: + result[job] = 'real_failure' + print(f'[retry] {job!r}: real failure (failed again on retry)', file=sys.stderr) + print(json.dumps(result)) + return 0 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/scripts/flakiness/tests/__init__.py b/scripts/flakiness/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/scripts/flakiness/tests/test_analyze_flakiness.py b/scripts/flakiness/tests/test_analyze_flakiness.py new file mode 100644 index 00000000..b1d0694d --- /dev/null +++ b/scripts/flakiness/tests/test_analyze_flakiness.py @@ -0,0 +1,275 @@ +"""Tests for analyze_flakiness.py — pure functions and main() integration.""" + +import json +import os +import sys +import unittest +from io import StringIO +from unittest.mock import patch + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +import db_utils +from analyze_flakiness import _get_severity, analyze_check, main + + +# --------------------------------------------------------------------------- +# Minimal config matching flakiness_config.yml defaults +# --------------------------------------------------------------------------- +_CONFIG = { + 'thresholds': { + 'window_size': 20, + 'min_runs': 5, + 'flaky_min_rate': 0.10, + 'flaky_max_rate': 0.60, + 'consecutive_failures_deterministic': 3, + }, + 'severity': { + 'low': [0.10, 0.20], + 'medium': [0.20, 0.40], + }, +} + + +def _row(status, category): + return {'status': status, 'conclusion_category': category} + + +def _passes(n): + return [_row('pass', 'pass')] * n + + +def _failures(n, category='test_failure'): + return [_row('fail', category)] * n + + +# --------------------------------------------------------------------------- +# _get_severity +# --------------------------------------------------------------------------- +class TestGetSeverity(unittest.TestCase): + def test_below_low_threshold_returns_stable(self): + self.assertEqual(_get_severity(0.05, _CONFIG), 'stable') + + def test_exactly_at_low_min_returns_low(self): + self.assertEqual(_get_severity(0.10, _CONFIG), 'low') + + def test_in_low_band_returns_low(self): + self.assertEqual(_get_severity(0.15, _CONFIG), 'low') + + def test_at_low_upper_bound_enters_medium(self): + # 0.20 >= medium low [0.20, 0.40] → medium + self.assertEqual(_get_severity(0.20, _CONFIG), 'medium') + + def test_in_medium_band_returns_medium(self): + self.assertEqual(_get_severity(0.30, _CONFIG), 'medium') + + def test_above_medium_upper_bound_returns_high(self): + self.assertEqual(_get_severity(0.45, _CONFIG), 'high') + + def test_zero_returns_stable(self): + self.assertEqual(_get_severity(0.0, _CONFIG), 'stable') + + def test_one_returns_high(self): + self.assertEqual(_get_severity(1.0, _CONFIG), 'high') + + +# --------------------------------------------------------------------------- +# analyze_check +# --------------------------------------------------------------------------- +class TestAnalyzeCheck(unittest.TestCase): + def test_returns_none_when_insufficient_runs(self): + # 4 rows < min_runs=5 + self.assertIsNone(analyze_check(_passes(3) + _failures(1), _CONFIG)) + + def test_returns_none_when_all_rows_are_infrastructure(self): + rows = _failures(5, category='infrastructure') + self.assertIsNone(analyze_check(rows, _CONFIG)) + + def test_stable_with_zero_failures(self): + result = analyze_check(_passes(8), _CONFIG) + self.assertIsNotNone(result) + self.assertEqual(result['classification'], 'stable') + self.assertEqual(result['flakiness_score'], 0.0) + + def test_flaky_with_failure_rate_in_range(self): + # 8 passes + 2 failures = 20% → flaky (in [0.10, 0.60]) + result = analyze_check(_passes(8) + _failures(2), _CONFIG) + self.assertIsNotNone(result) + self.assertEqual(result['classification'], 'flaky') + self.assertAlmostEqual(result['flakiness_score'], 0.2) + + def test_flaky_severity_assigned(self): + rows = _passes(8) + _failures(2) + result = analyze_check(rows, _CONFIG) + self.assertIn(result['severity'], ('low', 'medium', 'high')) + + def test_deterministic_by_consecutive_failures(self): + # 5 passes then exactly 3 consecutive test_failures → deterministic + result = analyze_check(_passes(5) + _failures(3), _CONFIG) + self.assertIsNotNone(result) + self.assertEqual(result['classification'], 'deterministic') + self.assertEqual(result['consecutive_failures'], 3) + + def test_deterministic_by_high_failure_rate(self): + # 2 passes + 8 failures = 80% > flaky_max=0.60 → deterministic + result = analyze_check(_passes(2) + _failures(8), _CONFIG) + self.assertIsNotNone(result) + self.assertEqual(result['classification'], 'deterministic') + + def test_infrastructure_rows_excluded_from_window(self): + # 3 infra rows + 5 pass rows → only 5 effective non-infra rows + rows = _failures(3, category='infrastructure') + _passes(5) + result = analyze_check(rows, _CONFIG) + self.assertIsNotNone(result) + self.assertEqual(result['total_runs'], 5) + self.assertEqual(result['classification'], 'stable') + + def test_flake_confirmed_rows_counted_in_flaky_failures(self): + rows = ( + _passes(7) + + [_row('pass', 'flake_confirmed')] * 2 + + _failures(1) + ) + result = analyze_check(rows, _CONFIG) + self.assertIsNotNone(result) + self.assertEqual(result['flaky_failures'], 2) + + def test_window_size_limits_history_analyzed(self): + # 25 rows: first 5 failures, then 20 passes. + # With window=20, only the last 20 (all passes) are considered → stable. + rows = _failures(5) + _passes(20) + result = analyze_check(rows, _CONFIG) + self.assertIsNotNone(result) + self.assertEqual(result['total_runs'], 20) + self.assertEqual(result['classification'], 'stable') + + def test_consecutive_count_ignores_infrastructure_failures(self): + # 5 passes, 2 infra fails, 2 test_failures at the end + # Infrastructure rows are filtered; consecutive test_failures = 2 (< 3 → not deterministic) + rows = _passes(5) + _failures(2, 'infrastructure') + _failures(2, 'test_failure') + result = analyze_check(rows, _CONFIG) + self.assertIsNotNone(result) + self.assertEqual(result['consecutive_failures'], 2) + # total non-infra = 7, failures = 2, rate = 0.286 → flaky + self.assertEqual(result['classification'], 'flaky') + + def test_result_contains_all_required_fields(self): + rows = _passes(8) + _failures(2) + result = analyze_check(rows, _CONFIG) + for field in ('classification', 'flakiness_score', 'severity', + 'total_runs', 'failure_count', 'flaky_failures', + 'consecutive_failures'): + self.assertIn(field, result) + + def test_deterministic_score_is_zero(self): + rows = _passes(5) + _failures(4) + result = analyze_check(rows, _CONFIG) + self.assertEqual(result['classification'], 'deterministic') + self.assertEqual(result['flakiness_score'], 0.0) + + def test_stable_score_is_zero(self): + rows = _passes(10) + result = analyze_check(rows, _CONFIG) + self.assertEqual(result['flakiness_score'], 0.0) + + def test_exactly_at_flaky_min_rate_is_flaky(self): + # 9 passes + 1 failure = 10% = flaky_min_rate exact boundary → flaky + rows = _passes(9) + _failures(1) + result = analyze_check(rows, _CONFIG) + self.assertIsNotNone(result) + self.assertEqual(result['classification'], 'flaky') + + +# --------------------------------------------------------------------------- +# main() integration +# --------------------------------------------------------------------------- +class TestAnalyzeMain(unittest.TestCase): + def _history_rows(self): + """10-row history: 6 passes, 2 failures, 2 passes → 20% → flaky (no trailing streak).""" + return _passes(6) + _failures(2) + _passes(2) + + @patch('analyze_flakiness.d1_query') + @patch('analyze_flakiness.d1_select') + @patch('analyze_flakiness.get_d1_credentials') + def test_main_outputs_valid_json_with_expected_keys( + self, mock_creds, mock_select, mock_query + ): + mock_creds.return_value = ('acct', 'db', 'tok') + combo = {'check_name': 'lint', 'job_name': 'lint', 'workflow_name': 'CI'} + mock_select.side_effect = [[combo], self._history_rows()] + mock_query.return_value = [{'results': []}] + + with patch('sys.argv', ['analyze_flakiness.py', '--repo', 'owner/repo']): + with patch('sys.stdout', new_callable=StringIO) as mock_out: + main() + output = json.loads(mock_out.getvalue()) + + self.assertIn('flaky', output) + self.assertIn('deterministic', output) + self.assertIn('stable', output) + + @patch('analyze_flakiness.d1_query') + @patch('analyze_flakiness.d1_select') + @patch('analyze_flakiness.get_d1_credentials') + def test_main_classifies_flaky_check_correctly( + self, mock_creds, mock_select, mock_query + ): + mock_creds.return_value = ('acct', 'db', 'tok') + combo = {'check_name': 'lint', 'job_name': 'lint', 'workflow_name': 'CI'} + mock_select.side_effect = [[combo], self._history_rows()] + mock_query.return_value = [{'results': []}] + + with patch('sys.argv', ['analyze_flakiness.py', '--repo', 'owner/repo']): + with patch('sys.stdout', new_callable=StringIO) as mock_out: + main() + output = json.loads(mock_out.getvalue()) + + self.assertEqual(len(output['flaky']), 1) + self.assertEqual(output['flaky'][0]['check_name'], 'lint') + + @patch('analyze_flakiness.d1_query') + @patch('analyze_flakiness.d1_select') + @patch('analyze_flakiness.get_d1_credentials') + def test_main_skips_checks_with_insufficient_history( + self, mock_creds, mock_select, mock_query + ): + mock_creds.return_value = ('acct', 'db', 'tok') + combo = {'check_name': 'new-job', 'job_name': 'new-job', 'workflow_name': 'CI'} + # Only 3 rows — below min_runs=5 + mock_select.side_effect = [[combo], _passes(3)] + mock_query.return_value = [{'results': []}] + + with patch('sys.argv', ['analyze_flakiness.py', '--repo', 'owner/repo']): + with patch('sys.stdout', new_callable=StringIO) as mock_out: + main() + output = json.loads(mock_out.getvalue()) + + # Should not appear in any category + total = (len(output['flaky']) + len(output['deterministic']) + + len(output['stable'])) + self.assertEqual(total, 0) + + @patch('analyze_flakiness.d1_query') + @patch('analyze_flakiness.d1_select') + @patch('analyze_flakiness.get_d1_credentials') + def test_main_upserts_score_for_each_classified_check( + self, mock_creds, mock_select, mock_query + ): + mock_creds.return_value = ('acct', 'db', 'tok') + combo = {'check_name': 'lint', 'job_name': 'lint', 'workflow_name': 'CI'} + mock_select.side_effect = [[combo], self._history_rows()] + mock_query.return_value = [{'results': []}] + + with patch('sys.argv', ['analyze_flakiness.py', '--repo', 'owner/repo']): + with patch('sys.stdout', new_callable=StringIO): + main() + + # d1_query should be called once to upsert the score + mock_query.assert_called_once() + sql = mock_query.call_args[0][3] + self.assertIn('INSERT INTO flakiness_scores', sql) + self.assertIn('ON CONFLICT', sql) + + +if __name__ == '__main__': + unittest.main() diff --git a/scripts/flakiness/tests/test_collect_ci_results.py b/scripts/flakiness/tests/test_collect_ci_results.py new file mode 100644 index 00000000..417cb2c3 --- /dev/null +++ b/scripts/flakiness/tests/test_collect_ci_results.py @@ -0,0 +1,260 @@ +"""Tests for collect_ci_results.py — classify_conclusion() and main() integration.""" + +import json +import os +import sys +import tempfile +import unittest +from io import StringIO +from unittest.mock import patch, MagicMock + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from collect_ci_results import classify_conclusion, main + +_INFRA_PATTERNS = ['econnreset', 'timed_out', 'timeout', 'network error', 'fetch failed'] + + +def _job(conclusion, name='test-job', steps=None): + return {'name': name, 'conclusion': conclusion, 'steps': steps or []} + + +def _step(name, conclusion='success'): + return {'name': name, 'conclusion': conclusion} + + +# --------------------------------------------------------------------------- +# classify_conclusion +# --------------------------------------------------------------------------- +class TestClassifyConclusion(unittest.TestCase): + def test_success_is_pass(self): + self.assertEqual(classify_conclusion(_job('success'), _INFRA_PATTERNS), 'pass') + + def test_skipped_is_skip(self): + self.assertEqual(classify_conclusion(_job('skipped'), _INFRA_PATTERNS), 'skip') + + def test_cancelled_is_skip(self): + self.assertEqual(classify_conclusion(_job('cancelled'), _INFRA_PATTERNS), 'skip') + + def test_neutral_is_skip(self): + self.assertEqual(classify_conclusion(_job('neutral'), _INFRA_PATTERNS), 'skip') + + def test_timed_out_is_always_infrastructure(self): + self.assertEqual(classify_conclusion(_job('timed_out'), _INFRA_PATTERNS), 'infrastructure') + + def test_failure_with_infra_pattern_in_step_name(self): + steps = [_step('Install deps — fetch failed happened', 'failure')] + self.assertEqual( + classify_conclusion(_job('failure', steps=steps), _INFRA_PATTERNS), + 'infrastructure', + ) + + def test_failure_with_econnreset_in_step(self): + steps = [_step('Run tests (ECONNRESET)', 'failure')] + self.assertEqual( + classify_conclusion(_job('failure', steps=steps), _INFRA_PATTERNS), + 'infrastructure', + ) + + def test_failure_with_network_error_in_job_name(self): + job = _job('failure', name='network error prone job') + self.assertEqual(classify_conclusion(job, _INFRA_PATTERNS), 'infrastructure') + + def test_failure_without_infra_pattern_is_test_failure(self): + steps = [_step('Run unit tests', 'failure')] + self.assertEqual( + classify_conclusion(_job('failure', steps=steps), _INFRA_PATTERNS), + 'test_failure', + ) + + def test_failure_with_no_steps_is_test_failure(self): + self.assertEqual(classify_conclusion(_job('failure'), _INFRA_PATTERNS), 'test_failure') + + def test_unknown_conclusion_treated_as_pass(self): + self.assertEqual( + classify_conclusion(_job('action_required'), _INFRA_PATTERNS), 'pass' + ) + + def test_empty_infra_patterns_makes_all_failures_test_failure(self): + steps = [_step('timeout step', 'failure')] + self.assertEqual( + classify_conclusion(_job('failure', steps=steps), []), + 'test_failure', + ) + + def test_case_insensitive_pattern_matching(self): + # Pattern stored lowercase, job conclusion text is lowercased before matching + steps = [_step('NETWORK ERROR in step', 'failure')] + self.assertEqual( + classify_conclusion(_job('failure', steps=steps), _INFRA_PATTERNS), + 'infrastructure', + ) + + +# --------------------------------------------------------------------------- +# main() integration +# --------------------------------------------------------------------------- +class TestCollectMain(unittest.TestCase): + def setUp(self): + self._mock_run_meta = { + 'run_attempt': 1, + 'name': 'PR Validation', + 'head_sha': 'abc123', + } + self._mock_jobs = { + 'jobs': [ + _job('success', name='lint'), + _job('failure', name='test', + steps=[_step('Run tests', 'failure')]), + ] + } + + def _make_get_side_effect(self): + def side_effect(url, **kwargs): + resp = MagicMock() + resp.raise_for_status = MagicMock() + resp.links = {} + if '/jobs' in url: + resp.json.return_value = self._mock_jobs + else: + resp.json.return_value = self._mock_run_meta + return resp + return side_effect + + @patch('collect_ci_results.d1_query') + @patch('collect_ci_results.get_d1_credentials') + @patch('collect_ci_results.get_infra_patterns') + @patch('collect_ci_results.requests.get') + def test_failed_jobs_list_contains_only_test_failures( + self, mock_get, mock_infra, mock_creds, mock_d1 + ): + mock_get.side_effect = self._make_get_side_effect() + mock_infra.return_value = [] + mock_creds.return_value = ('acct', 'db', 'tok') + mock_d1.return_value = [{'results': []}] + + with patch('sys.argv', [ + 'collect_ci_results.py', + '--workflow-run-id', '99999', + '--repo', 'owner/repo', + '--github-token', 'tok', + ]): + with patch('sys.stdout', new_callable=StringIO) as mock_out: + main() + output = json.loads(mock_out.getvalue()) + + self.assertIn('test', output['failed_jobs']) + self.assertNotIn('lint', output['failed_jobs']) + + @patch('collect_ci_results.d1_query') + @patch('collect_ci_results.get_d1_credentials') + @patch('collect_ci_results.get_infra_patterns') + @patch('collect_ci_results.requests.get') + def test_output_contains_all_required_fields( + self, mock_get, mock_infra, mock_creds, mock_d1 + ): + mock_get.side_effect = self._make_get_side_effect() + mock_infra.return_value = [] + mock_creds.return_value = ('acct', 'db', 'tok') + mock_d1.return_value = [{'results': []}] + + with patch('sys.argv', [ + 'collect_ci_results.py', + '--workflow-run-id', '99999', + '--repo', 'owner/repo', + '--github-token', 'tok', + ]): + with patch('sys.stdout', new_callable=StringIO) as mock_out: + main() + output = json.loads(mock_out.getvalue()) + + for key in ('failed_jobs', 'run_attempt', 'workflow_name', 'all_jobs'): + self.assertIn(key, output) + + @patch('collect_ci_results.d1_query') + @patch('collect_ci_results.get_d1_credentials') + @patch('collect_ci_results.get_infra_patterns') + @patch('collect_ci_results.requests.get') + def test_dry_run_does_not_call_d1( + self, mock_get, mock_infra, mock_creds, mock_d1 + ): + mock_get.side_effect = self._make_get_side_effect() + mock_infra.return_value = [] + mock_creds.return_value = ('acct', 'db', 'tok') + + with patch('sys.argv', [ + 'collect_ci_results.py', + '--workflow-run-id', '99999', + '--repo', 'owner/repo', + '--github-token', 'tok', + '--dry-run', + ]): + with patch('sys.stdout', new_callable=StringIO): + main() + + mock_d1.assert_not_called() + + @patch('collect_ci_results.d1_query') + @patch('collect_ci_results.get_d1_credentials') + @patch('collect_ci_results.get_infra_patterns') + @patch('collect_ci_results.requests.get') + def test_d1_insert_called_for_each_job( + self, mock_get, mock_infra, mock_creds, mock_d1 + ): + mock_get.side_effect = self._make_get_side_effect() + mock_infra.return_value = [] + mock_creds.return_value = ('acct', 'db', 'tok') + mock_d1.return_value = [{'results': []}] + + with patch('sys.argv', [ + 'collect_ci_results.py', + '--workflow-run-id', '99999', + '--repo', 'owner/repo', + '--github-token', 'tok', + ]): + with patch('sys.stdout', new_callable=StringIO): + main() + + # 2 jobs + 1 prune DELETE = 3 d1_query calls + self.assertEqual(mock_d1.call_count, 3) + # All INSERT calls should target ci_run_history + insert_calls = [ + c for c in mock_d1.call_args_list + if 'INSERT INTO ci_run_history' in c[0][3] + ] + self.assertEqual(len(insert_calls), 2) + + @patch('collect_ci_results.d1_query') + @patch('collect_ci_results.get_d1_credentials') + @patch('collect_ci_results.get_infra_patterns') + @patch('collect_ci_results.requests.get') + def test_infrastructure_failure_not_in_failed_jobs( + self, mock_get, mock_infra, mock_creds, mock_d1 + ): + # replace the failing job with an infrastructure failure + self._mock_jobs = { + 'jobs': [ + _job('failure', name='deploy', + steps=[_step('Network call ECONNRESET', 'failure')]), + ] + } + mock_get.side_effect = self._make_get_side_effect() + mock_infra.return_value = ['econnreset'] + mock_creds.return_value = ('acct', 'db', 'tok') + mock_d1.return_value = [{'results': []}] + + with patch('sys.argv', [ + 'collect_ci_results.py', + '--workflow-run-id', '99999', + '--repo', 'owner/repo', + '--github-token', 'tok', + ]): + with patch('sys.stdout', new_callable=StringIO) as mock_out: + main() + output = json.loads(mock_out.getvalue()) + + self.assertEqual(output['failed_jobs'], []) + + +if __name__ == '__main__': + unittest.main() diff --git a/scripts/flakiness/tests/test_db_utils.py b/scripts/flakiness/tests/test_db_utils.py new file mode 100644 index 00000000..d4afb396 --- /dev/null +++ b/scripts/flakiness/tests/test_db_utils.py @@ -0,0 +1,195 @@ +"""Tests for db_utils.py — Cloudflare D1 REST client.""" + +import os +import sys +import unittest +from unittest.mock import patch, MagicMock + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +import db_utils + + +class TestGetD1Credentials(unittest.TestCase): + _KEYS = ('CLOUDFLARE_ACCOUNT_ID', 'CLOUDFLARE_D1_DATABASE_ID', 'CLOUDFLARE_API_TOKEN') + + def setUp(self): + for key in self._KEYS: + os.environ.pop(key, None) + + def tearDown(self): + for key in self._KEYS: + os.environ.pop(key, None) + + def test_returns_tuple_when_all_present(self): + os.environ['CLOUDFLARE_ACCOUNT_ID'] = 'acct123' + os.environ['CLOUDFLARE_D1_DATABASE_ID'] = 'db456' + os.environ['CLOUDFLARE_API_TOKEN'] = 'tok789' + self.assertEqual(db_utils.get_d1_credentials(), ('acct123', 'db456', 'tok789')) + + def test_raises_when_one_var_missing(self): + os.environ['CLOUDFLARE_ACCOUNT_ID'] = 'acct123' + os.environ['CLOUDFLARE_D1_DATABASE_ID'] = 'db456' + with self.assertRaises(RuntimeError) as ctx: + db_utils.get_d1_credentials() + self.assertIn('CLOUDFLARE_API_TOKEN', str(ctx.exception)) + + def test_raises_when_all_vars_missing(self): + with self.assertRaises(RuntimeError) as ctx: + db_utils.get_d1_credentials() + msg = str(ctx.exception) + self.assertIn('CLOUDFLARE_ACCOUNT_ID', msg) + self.assertIn('CLOUDFLARE_D1_DATABASE_ID', msg) + self.assertIn('CLOUDFLARE_API_TOKEN', msg) + + def test_error_message_lists_only_missing_vars(self): + os.environ['CLOUDFLARE_ACCOUNT_ID'] = 'acct' + with self.assertRaises(RuntimeError) as ctx: + db_utils.get_d1_credentials() + msg = str(ctx.exception) + self.assertNotIn('CLOUDFLARE_ACCOUNT_ID', msg) + self.assertIn('CLOUDFLARE_D1_DATABASE_ID', msg) + self.assertIn('CLOUDFLARE_API_TOKEN', msg) + + +class TestD1Query(unittest.TestCase): + def _mock_response(self, json_data, status_code=200): + mock = MagicMock() + mock.json.return_value = json_data + mock.status_code = status_code + if status_code >= 400: + from requests.exceptions import HTTPError + mock.raise_for_status.side_effect = HTTPError(response=mock) + else: + mock.raise_for_status = MagicMock() + return mock + + @patch('db_utils.requests.post') + def test_success_returns_result_list(self, mock_post): + payload = {'success': True, 'result': [{'results': [{'id': 1}]}]} + mock_post.return_value = self._mock_response(payload) + result = db_utils.d1_query('acct', 'db', 'tok', 'SELECT 1') + self.assertEqual(result, payload['result']) + + @patch('db_utils.requests.post') + def test_sends_correct_url_containing_account_and_db(self, mock_post): + mock_post.return_value = self._mock_response({'success': True, 'result': []}) + db_utils.d1_query('acct123', 'db456', 'mytoken', 'SELECT 1') + url = mock_post.call_args[0][0] + self.assertIn('acct123', url) + self.assertIn('db456', url) + + @patch('db_utils.requests.post') + def test_sends_bearer_auth_header(self, mock_post): + mock_post.return_value = self._mock_response({'success': True, 'result': []}) + db_utils.d1_query('a', 'b', 'mytoken', 'SELECT 1') + headers = mock_post.call_args[1]['headers'] + self.assertEqual(headers['Authorization'], 'Bearer mytoken') + + @patch('db_utils.requests.post') + def test_sends_sql_and_params_in_body(self, mock_post): + mock_post.return_value = self._mock_response({'success': True, 'result': []}) + db_utils.d1_query('a', 'b', 'c', 'SELECT * WHERE id = ?', params=[42]) + body = mock_post.call_args[1]['json'] + self.assertEqual(body['sql'], 'SELECT * WHERE id = ?') + self.assertEqual(body['params'], [42]) + + @patch('db_utils.requests.post') + def test_omits_params_key_when_none(self, mock_post): + mock_post.return_value = self._mock_response({'success': True, 'result': []}) + db_utils.d1_query('a', 'b', 'c', 'SELECT 1') + body = mock_post.call_args[1]['json'] + self.assertNotIn('params', body) + + @patch('db_utils.requests.post') + def test_raises_runtime_error_on_api_failure(self, mock_post): + payload = {'success': False, 'errors': [{'message': 'bad sql'}]} + mock_post.return_value = self._mock_response(payload) + with self.assertRaises(RuntimeError) as ctx: + db_utils.d1_query('a', 'b', 'c', 'BAD SQL') + self.assertIn('D1 query failed', str(ctx.exception)) + + @patch('db_utils.requests.post') + def test_raises_http_error_on_4xx(self, mock_post): + from requests.exceptions import HTTPError + mock_post.return_value = self._mock_response({'error': 'unauthorized'}, status_code=401) + with self.assertRaises(HTTPError): + db_utils.d1_query('a', 'b', 'c', 'SELECT 1') + + +class TestD1Select(unittest.TestCase): + @patch('db_utils.d1_query') + def test_returns_row_dicts_from_results(self, mock_query): + rows = [{'id': 1, 'name': 'a'}, {'id': 2, 'name': 'b'}] + mock_query.return_value = [{'results': rows}] + self.assertEqual(db_utils.d1_select('a', 'b', 'c', 'SELECT *'), rows) + + @patch('db_utils.d1_query') + def test_returns_empty_list_when_result_is_empty(self, mock_query): + mock_query.return_value = [] + self.assertEqual(db_utils.d1_select('a', 'b', 'c', 'SELECT *'), []) + + @patch('db_utils.d1_query') + def test_returns_empty_list_when_results_key_missing(self, mock_query): + mock_query.return_value = [{}] + self.assertEqual(db_utils.d1_select('a', 'b', 'c', 'SELECT *'), []) + + @patch('db_utils.d1_query') + def test_passes_params_through_to_d1_query(self, mock_query): + mock_query.return_value = [{'results': []}] + db_utils.d1_select('a', 'b', 'c', 'SELECT * WHERE id = ?', params=[99]) + mock_query.assert_called_once_with('a', 'b', 'c', 'SELECT * WHERE id = ?', [99]) + + +class TestGetInfraPatterns(unittest.TestCase): + @patch('db_utils.d1_select') + def test_returns_lowercase_patterns(self, mock_select): + mock_select.return_value = [ + {'pattern': 'ECONNRESET'}, + {'pattern': 'Timed_Out'}, + {'pattern': 'network error'}, + ] + self.assertEqual( + db_utils.get_infra_patterns('a', 'b', 'c'), + ['econnreset', 'timed_out', 'network error'], + ) + + @patch('db_utils.d1_select') + def test_returns_empty_list_when_no_rows(self, mock_select): + mock_select.return_value = [] + self.assertEqual(db_utils.get_infra_patterns('a', 'b', 'c'), []) + + @patch('db_utils.d1_select') + def test_queries_correct_table(self, mock_select): + mock_select.return_value = [] + db_utils.get_infra_patterns('a', 'b', 'c') + sql = mock_select.call_args[0][3] + self.assertIn('known_infrastructure_issues', sql) + + +class TestLoadConfig(unittest.TestCase): + def setUp(self): + db_utils._config_cache = None + + def tearDown(self): + db_utils._config_cache = None + + def test_returns_dict_with_expected_top_level_keys(self): + config = db_utils.load_config() + for key in ('thresholds', 'severity', 'github', 'labels'): + self.assertIn(key, config) + + def test_thresholds_has_all_required_fields(self): + t = db_utils.load_config()['thresholds'] + for key in ('window_size', 'min_runs', 'flaky_min_rate', + 'flaky_max_rate', 'consecutive_failures_deterministic'): + self.assertIn(key, t) + + def test_second_call_returns_same_cached_object(self): + first = db_utils.load_config() + second = db_utils.load_config() + self.assertIs(first, second) + + +if __name__ == '__main__': + unittest.main() diff --git a/scripts/flakiness/tests/test_report_flakiness.py b/scripts/flakiness/tests/test_report_flakiness.py new file mode 100644 index 00000000..abd82981 --- /dev/null +++ b/scripts/flakiness/tests/test_report_flakiness.py @@ -0,0 +1,305 @@ +"""Tests for report_flakiness.py — report builders and main() integration.""" + +import json +import os +import sys +import tempfile +import unittest +from unittest.mock import patch, MagicMock + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +import db_utils +from report_flakiness import _build_issue_body, _build_markdown_report, main + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- +def _entry( + check_name='lint', + job_name='lint', + workflow_name='CI', + score=0.25, + severity='medium', + classification='flaky', + total=10, + failures=3, + flaky=2, + consecutive=0, + last_updated='2026-03-11', +): + return { + 'check_name': check_name, + 'job_name': job_name, + 'workflow_name': workflow_name, + 'flakiness_score': score, + 'severity': severity, + 'classification': classification, + 'total_runs': total, + 'failure_count': failures, + 'flaky_failures': flaky, + 'consecutive_failures': consecutive, + 'last_updated': last_updated, + } + + +# --------------------------------------------------------------------------- +# _build_issue_body +# --------------------------------------------------------------------------- +class TestBuildIssueBody(unittest.TestCase): + def test_contains_check_name(self): + self.assertIn('my-check', _build_issue_body(_entry(check_name='my-check'))) + + def test_contains_severity(self): + self.assertIn('high', _build_issue_body(_entry(severity='high'))) + + def test_contains_classification(self): + self.assertIn('flaky', _build_issue_body(_entry(classification='flaky'))) + + def test_score_rendered_as_percentage(self): + body = _build_issue_body(_entry(score=0.25)) + self.assertIn('25.00%', body) + + def test_total_runs_and_failure_count_present(self): + body = _build_issue_body(_entry(total=20, failures=5)) + self.assertIn('20', body) + self.assertIn('5', body) + + def test_failure_rate_computed_correctly(self): + body = _build_issue_body(_entry(total=10, failures=3)) + self.assertIn('30.0%', body) + + def test_zero_total_does_not_raise_and_shows_na(self): + body = _build_issue_body(_entry(total=0, failures=0)) + self.assertIn('N/A', body) + + def test_contains_nextg_steps_section(self): + body = _build_issue_body(_entry()) + self.assertIn('Next steps', body) + + +# --------------------------------------------------------------------------- +# _build_markdown_report +# --------------------------------------------------------------------------- +class TestBuildMarkdownReport(unittest.TestCase): + def _scores(self): + return [ + _entry('check-a', score=0.30, classification='flaky', severity='medium'), + _entry('check-b', score=0.0, classification='deterministic', + severity='deterministic', consecutive=5), + _entry('check-c', score=0.0, classification='stable', severity='stable'), + ] + + def test_contains_repo_name(self): + self.assertIn('owner/repo', _build_markdown_report(self._scores(), 'owner/repo')) + + def test_contains_summary_section(self): + self.assertIn('Summary', _build_markdown_report(self._scores(), 'owner/repo')) + + def test_flaky_count_in_summary_table(self): + report = _build_markdown_report(self._scores(), 'owner/repo') + self.assertIn('| Flaky | 1 |', report) + + def test_deterministic_count_in_summary_table(self): + report = _build_markdown_report(self._scores(), 'owner/repo') + self.assertIn('| Deterministic failures | 1 |', report) + + def test_stable_count_in_summary_table(self): + report = _build_markdown_report(self._scores(), 'owner/repo') + self.assertIn('| Stable | 1 |', report) + + def test_flaky_check_name_appears_in_report(self): + report = _build_markdown_report(self._scores(), 'owner/repo') + self.assertIn('check-a', report) + + def test_empty_scores_shows_no_flaky_placeholder(self): + report = _build_markdown_report([], 'owner/repo') + self.assertIn('_No flaky tests detected._', report) + + def test_flaky_checks_sorted_by_score_descending(self): + scores = [ + _entry('low-check', score=0.15, classification='flaky', severity='low'), + _entry('high-check', score=0.45, classification='flaky', severity='high'), + ] + report = _build_markdown_report(scores, 'repo') + high_pos = report.index('high-check') + low_pos = report.index('low-check') + self.assertLess(high_pos, low_pos) + + +# --------------------------------------------------------------------------- +# main() integration +# --------------------------------------------------------------------------- +class TestReportMain(unittest.TestCase): + def setUp(self): + db_utils._config_cache = None + + def tearDown(self): + db_utils._config_cache = None + + def _make_flaky_entry(self): + return _entry('lint', score=0.25, classification='flaky') + + def _make_flaky_report(self): + return { + 'flaky': [self._make_flaky_entry()], + 'deterministic': [], + 'stable': [], + } + + @patch('report_flakiness.d1_select') + @patch('report_flakiness.get_d1_credentials') + @patch('report_flakiness.requests.get') + @patch('report_flakiness.requests.post') + def test_main_writes_markdown_and_metrics_files( + self, mock_post, mock_get, mock_creds, mock_select + ): + mock_creds.return_value = ('acct', 'db', 'tok') + mock_select.return_value = [self._make_flaky_entry()] + + # GitHub issue search returns no existing issue → will create one + search_resp = MagicMock() + search_resp.status_code = 200 + search_resp.json.return_value = {'items': []} + mock_get.return_value = search_resp + + create_resp = MagicMock() + create_resp.raise_for_status = MagicMock() + create_resp.json.return_value = {'number': 1, 'state': 'open'} + mock_post.return_value = create_resp + + with tempfile.TemporaryDirectory() as tmpdir: + report_file = os.path.join(tmpdir, 'flaky_report.json') + with open(report_file, 'w') as fh: + json.dump(self._make_flaky_report(), fh) + + with patch('report_flakiness.DATA_DIR', tmpdir): + with patch('sys.argv', [ + 'report_flakiness.py', + '--repo', 'owner/repo', + '--github-token', 'tok', + '--flaky-report', report_file, + ]): + main() + + self.assertTrue( + os.path.exists(os.path.join(tmpdir, 'flakiness_report.md')) + ) + self.assertTrue( + os.path.exists(os.path.join(tmpdir, 'flakiness_metrics.json')) + ) + + @patch('report_flakiness.d1_select') + @patch('report_flakiness.get_d1_credentials') + @patch('report_flakiness.requests.get') + @patch('report_flakiness.requests.post') + def test_metrics_json_has_expected_structure( + self, mock_post, mock_get, mock_creds, mock_select + ): + mock_creds.return_value = ('acct', 'db', 'tok') + mock_select.return_value = [self._make_flaky_entry()] + + search_resp = MagicMock() + search_resp.status_code = 200 + search_resp.json.return_value = {'items': []} + mock_get.return_value = search_resp + + create_resp = MagicMock() + create_resp.raise_for_status = MagicMock() + create_resp.json.return_value = {'number': 1} + mock_post.return_value = create_resp + + with tempfile.TemporaryDirectory() as tmpdir: + report_file = os.path.join(tmpdir, 'flaky_report.json') + with open(report_file, 'w') as fh: + json.dump(self._make_flaky_report(), fh) + + with patch('report_flakiness.DATA_DIR', tmpdir): + with patch('sys.argv', [ + 'report_flakiness.py', + '--repo', 'owner/repo', + '--github-token', 'tok', + '--flaky-report', report_file, + ]): + main() + + metrics_path = os.path.join(tmpdir, 'flakiness_metrics.json') + with open(metrics_path) as fh: + metrics = json.load(fh) + + for key in ('generated_at', 'repo', 'summary', 'scores'): + self.assertIn(key, metrics) + for key in ('flaky', 'deterministic', 'stable'): + self.assertIn(key, metrics['summary']) + + @patch('report_flakiness.d1_select') + @patch('report_flakiness.get_d1_credentials') + def test_no_github_flag_skips_all_github_api_calls(self, mock_creds, mock_select): + mock_creds.return_value = ('acct', 'db', 'tok') + mock_select.return_value = [] + + with tempfile.TemporaryDirectory() as tmpdir: + report_file = os.path.join(tmpdir, 'flaky_report.json') + with open(report_file, 'w') as fh: + json.dump({'flaky': [], 'deterministic': [], 'stable': []}, fh) + + with patch('report_flakiness.DATA_DIR', tmpdir): + with patch('sys.argv', [ + 'report_flakiness.py', + '--repo', 'owner/repo', + '--github-token', 'tok', + '--flaky-report', report_file, + '--no-github', + ]): + with patch('report_flakiness.requests.post') as mock_post: + with patch('report_flakiness.requests.get') as mock_get: + main() + + mock_post.assert_not_called() + mock_get.assert_not_called() + + @patch('report_flakiness.d1_select') + @patch('report_flakiness.get_d1_credentials') + @patch('report_flakiness.requests.get') + @patch('report_flakiness.requests.post') + def test_creates_github_issue_for_new_flaky_check( + self, mock_post, mock_get, mock_creds, mock_select + ): + mock_creds.return_value = ('acct', 'db', 'tok') + mock_select.return_value = [self._make_flaky_entry()] + + # Search returns no existing issue + search_resp = MagicMock() + search_resp.status_code = 200 + search_resp.json.return_value = {'items': []} + mock_get.return_value = search_resp + + create_resp = MagicMock() + create_resp.raise_for_status = MagicMock() + create_resp.json.return_value = {'number': 42} + mock_post.return_value = create_resp + + with tempfile.TemporaryDirectory() as tmpdir: + report_file = os.path.join(tmpdir, 'flaky_report.json') + with open(report_file, 'w') as fh: + json.dump(self._make_flaky_report(), fh) + + with patch('report_flakiness.DATA_DIR', tmpdir): + with patch('sys.argv', [ + 'report_flakiness.py', + '--repo', 'owner/repo', + '--github-token', 'tok', + '--flaky-report', report_file, + ]): + main() + + # The first POST creates the issue (not a comment) + create_call = mock_post.call_args_list[0] + url = create_call[0][0] + self.assertIn('/issues', url) + self.assertNotIn('/comments', url) + + +if __name__ == '__main__': + unittest.main() diff --git a/scripts/flakiness/tests/test_retry_failures.py b/scripts/flakiness/tests/test_retry_failures.py new file mode 100644 index 00000000..1c589013 --- /dev/null +++ b/scripts/flakiness/tests/test_retry_failures.py @@ -0,0 +1,321 @@ +"""Tests for retry_failures.py — trigger_rerun, mark_flake_confirmed, main().""" + +import json +import os +import sys +import tempfile +import unittest +from io import StringIO +from unittest.mock import patch, MagicMock + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +import retry_failures +from retry_failures import trigger_rerun, fetch_job_conclusions, mark_flake_confirmed + + +def _write_collect_output(path, failed_jobs, run_attempt=1): + data = {'failed_jobs': failed_jobs, 'run_attempt': run_attempt} + with open(path, 'w', encoding='utf-8') as fh: + json.dump(data, fh) + + +# --------------------------------------------------------------------------- +# trigger_rerun +# --------------------------------------------------------------------------- +class TestTriggerRerun(unittest.TestCase): + @patch('retry_failures.requests.post') + def test_returns_false_on_403(self, mock_post): + resp = MagicMock() + resp.status_code = 403 + mock_post.return_value = resp + self.assertFalse(trigger_rerun('owner', 'repo', 12345, 'token')) + + @patch('retry_failures.requests.post') + def test_returns_true_on_success(self, mock_post): + resp = MagicMock() + resp.status_code = 201 + resp.raise_for_status = MagicMock() + mock_post.return_value = resp + self.assertTrue(trigger_rerun('owner', 'repo', 12345, 'token')) + + @patch('retry_failures.requests.post') + def test_calls_correct_github_endpoint(self, mock_post): + resp = MagicMock() + resp.status_code = 201 + resp.raise_for_status = MagicMock() + mock_post.return_value = resp + trigger_rerun('myowner', 'myrepo', 99, 'tok') + url = mock_post.call_args[0][0] + self.assertIn('myowner/myrepo', url) + self.assertIn('rerun-failed-jobs', url) + + +# --------------------------------------------------------------------------- +# fetch_job_conclusions +# --------------------------------------------------------------------------- +class TestFetchJobConclusions(unittest.TestCase): + @patch('retry_failures.requests.get') + def test_returns_name_to_conclusion_mapping(self, mock_get): + resp = MagicMock() + resp.raise_for_status = MagicMock() + resp.links = {} + resp.json.return_value = { + 'jobs': [ + {'name': 'lint', 'conclusion': 'success'}, + {'name': 'test', 'conclusion': 'failure'}, + ] + } + mock_get.return_value = resp + result = fetch_job_conclusions('owner', 'repo', 12345, 'tok') + self.assertEqual(result, {'lint': 'success', 'test': 'failure'}) + + @patch('retry_failures.requests.get') + def test_follows_pagination_links(self, mock_get): + page1 = MagicMock() + page1.raise_for_status = MagicMock() + page1.links = {'next': {'url': 'https://api.github.com/page2'}} + page1.json.return_value = {'jobs': [{'name': 'job1', 'conclusion': 'success'}]} + + page2 = MagicMock() + page2.raise_for_status = MagicMock() + page2.links = {} + page2.json.return_value = {'jobs': [{'name': 'job2', 'conclusion': 'failure'}]} + + mock_get.side_effect = [page1, page2] + result = fetch_job_conclusions('owner', 'repo', 12345, 'tok') + self.assertIn('job1', result) + self.assertIn('job2', result) + self.assertEqual(mock_get.call_count, 2) + + @patch('retry_failures.requests.get') + def test_returns_none_for_in_progress_job(self, mock_get): + # A job with conclusion=None means it is still in-progress; + # the code returns None directly (not 'unknown') in that case. + resp = MagicMock() + resp.raise_for_status = MagicMock() + resp.links = {} + resp.json.return_value = {'jobs': [{'name': 'job', 'conclusion': None}]} + mock_get.return_value = resp + result = fetch_job_conclusions('owner', 'repo', 12345, 'tok') + self.assertIsNone(result['job']) + + +# --------------------------------------------------------------------------- +# mark_flake_confirmed +# --------------------------------------------------------------------------- +class TestMarkFlakeConfirmed(unittest.TestCase): + @patch('retry_failures.d1_query') + def test_calls_d1_query_once(self, mock_d1): + mock_d1.return_value = [{'results': []}] + mark_flake_confirmed('acct', 'db', 'tok', 99999, 'test-job', 'owner/repo', 2) + mock_d1.assert_called_once() + + @patch('retry_failures.d1_query') + def test_sql_targets_ci_run_history(self, mock_d1): + mock_d1.return_value = [{'results': []}] + mark_flake_confirmed('acct', 'db', 'tok', 99999, 'test-job', 'owner/repo', 2) + sql = mock_d1.call_args[0][3] + self.assertIn('INSERT INTO ci_run_history', sql) + + @patch('retry_failures.d1_query') + def test_params_include_run_id_attempt_job_and_repo(self, mock_d1): + mock_d1.return_value = [{'results': []}] + mark_flake_confirmed('acct', 'db', 'tok', 99999, 'test-job', 'owner/repo', 2) + params = mock_d1.call_args[0][4] + self.assertIn(99999, params) # workflow_run_id + self.assertIn(2, params) # new_attempt + self.assertIn('test-job', params) + self.assertIn('owner/repo', params) + + @patch('retry_failures.d1_query') + def test_inserts_flake_confirmed_conclusion(self, mock_d1): + mock_d1.return_value = [{'results': []}] + mark_flake_confirmed('acct', 'db', 'tok', 99999, 'test-job', 'owner/repo', 2) + sql = mock_d1.call_args[0][3] + self.assertIn('flake_confirmed', sql) + + +# --------------------------------------------------------------------------- +# main() +# --------------------------------------------------------------------------- +class TestRetryMain(unittest.TestCase): + @patch('retry_failures.get_d1_credentials') + def test_skips_when_already_retried_run_attempt_gt_1(self, mock_creds): + with tempfile.TemporaryDirectory() as tmpdir: + collect_file = os.path.join(tmpdir, 'collect.json') + _write_collect_output(collect_file, ['test'], run_attempt=2) + + with patch('sys.argv', [ + 'retry_failures.py', + '--workflow-run-id', '12345', + '--repo', 'owner/repo', + '--collect-output', collect_file, + ]): + with patch('sys.stdout', new_callable=StringIO) as mock_out: + retry_failures.main() + output = json.loads(mock_out.getvalue()) + + self.assertEqual(output['test'], 'skipped_already_retried') + mock_creds.assert_not_called() + + def test_empty_failed_jobs_outputs_empty_dict(self): + with tempfile.TemporaryDirectory() as tmpdir: + collect_file = os.path.join(tmpdir, 'collect.json') + _write_collect_output(collect_file, [], run_attempt=1) + + with patch('sys.argv', [ + 'retry_failures.py', + '--workflow-run-id', '12345', + '--repo', 'owner/repo', + '--collect-output', collect_file, + ]): + with patch('sys.stdout', new_callable=StringIO) as mock_out: + retry_failures.main() + output = json.loads(mock_out.getvalue()) + + self.assertEqual(output, {}) + + @patch('retry_failures.get_d1_credentials') + @patch('retry_failures.trigger_rerun') + def test_rerun_not_permitted_when_trigger_returns_false( + self, mock_trigger, mock_creds + ): + mock_creds.return_value = ('acct', 'db', 'tok') + mock_trigger.return_value = False + + with tempfile.TemporaryDirectory() as tmpdir: + collect_file = os.path.join(tmpdir, 'collect.json') + _write_collect_output(collect_file, ['test'], run_attempt=1) + + with patch('sys.argv', [ + 'retry_failures.py', + '--workflow-run-id', '12345', + '--repo', 'owner/repo', + '--collect-output', collect_file, + ]): + with patch('sys.stdout', new_callable=StringIO) as mock_out: + retry_failures.main() + output = json.loads(mock_out.getvalue()) + + self.assertEqual(output['test'], 'rerun_not_permitted') + + @patch('retry_failures.get_d1_credentials') + @patch('retry_failures.trigger_rerun') + @patch('retry_failures.poll_run_completion') + def test_poll_timeout_result_when_poll_returns_none( + self, mock_poll, mock_trigger, mock_creds + ): + mock_creds.return_value = ('acct', 'db', 'tok') + mock_trigger.return_value = True + mock_poll.return_value = (None, None) + + with tempfile.TemporaryDirectory() as tmpdir: + collect_file = os.path.join(tmpdir, 'collect.json') + _write_collect_output(collect_file, ['test'], run_attempt=1) + + with patch('sys.argv', [ + 'retry_failures.py', + '--workflow-run-id', '12345', + '--repo', 'owner/repo', + '--collect-output', collect_file, + ]): + with patch('sys.stdout', new_callable=StringIO) as mock_out: + retry_failures.main() + output = json.loads(mock_out.getvalue()) + + self.assertEqual(output['test'], 'poll_timeout') + + @patch('retry_failures.mark_flake_confirmed') + @patch('retry_failures.fetch_job_conclusions') + @patch('retry_failures.poll_run_completion') + @patch('retry_failures.trigger_rerun') + @patch('retry_failures.get_d1_credentials') + def test_confirmed_flake_when_job_passes_on_retry( + self, mock_creds, mock_trigger, mock_poll, mock_fetch, mock_mark + ): + mock_creds.return_value = ('acct', 'db', 'tok') + mock_trigger.return_value = True + mock_poll.return_value = ('success', 2) + mock_fetch.return_value = {'test': 'success'} + + with tempfile.TemporaryDirectory() as tmpdir: + collect_file = os.path.join(tmpdir, 'collect.json') + _write_collect_output(collect_file, ['test'], run_attempt=1) + + with patch('sys.argv', [ + 'retry_failures.py', + '--workflow-run-id', '12345', + '--repo', 'owner/repo', + '--collect-output', collect_file, + ]): + with patch('sys.stdout', new_callable=StringIO) as mock_out: + retry_failures.main() + output = json.loads(mock_out.getvalue()) + + self.assertEqual(output['test'], 'confirmed_flake') + mock_mark.assert_called_once_with('acct', 'db', 'tok', 12345, 'test', 'owner/repo', 2) + + @patch('retry_failures.d1_query') + @patch('retry_failures.fetch_job_conclusions') + @patch('retry_failures.poll_run_completion') + @patch('retry_failures.trigger_rerun') + @patch('retry_failures.get_d1_credentials') + def test_real_failure_when_job_fails_on_retry( + self, mock_creds, mock_trigger, mock_poll, mock_fetch, mock_d1 + ): + mock_creds.return_value = ('acct', 'db', 'tok') + mock_trigger.return_value = True + mock_poll.return_value = ('failure', 2) + mock_fetch.return_value = {'test': 'failure'} + + with tempfile.TemporaryDirectory() as tmpdir: + collect_file = os.path.join(tmpdir, 'collect.json') + _write_collect_output(collect_file, ['test'], run_attempt=1) + + with patch('sys.argv', [ + 'retry_failures.py', + '--workflow-run-id', '12345', + '--repo', 'owner/repo', + '--collect-output', collect_file, + ]): + with patch('sys.stdout', new_callable=StringIO) as mock_out: + retry_failures.main() + output = json.loads(mock_out.getvalue()) + + self.assertEqual(output['test'], 'real_failure') + mock_d1.assert_not_called() + + @patch('retry_failures.mark_flake_confirmed') + @patch('retry_failures.fetch_job_conclusions') + @patch('retry_failures.poll_run_completion') + @patch('retry_failures.trigger_rerun') + @patch('retry_failures.get_d1_credentials') + def test_multiple_jobs_classified_independently( + self, mock_creds, mock_trigger, mock_poll, mock_fetch, mock_mark + ): + mock_creds.return_value = ('acct', 'db', 'tok') + mock_trigger.return_value = True + mock_poll.return_value = ('success', 2) + mock_fetch.return_value = {'job-a': 'success', 'job-b': 'failure'} + + with tempfile.TemporaryDirectory() as tmpdir: + collect_file = os.path.join(tmpdir, 'collect.json') + _write_collect_output(collect_file, ['job-a', 'job-b'], run_attempt=1) + + with patch('sys.argv', [ + 'retry_failures.py', + '--workflow-run-id', '12345', + '--repo', 'owner/repo', + '--collect-output', collect_file, + ]): + with patch('sys.stdout', new_callable=StringIO) as mock_out: + retry_failures.main() + output = json.loads(mock_out.getvalue()) + + self.assertEqual(output['job-a'], 'confirmed_flake') + self.assertEqual(output['job-b'], 'real_failure') + + +if __name__ == '__main__': + unittest.main() diff --git a/src/database.py b/src/database.py index ad946a5f..127a2278 100644 --- a/src/database.py +++ b/src/database.py @@ -392,3 +392,45 @@ async def delete_timeline_from_db(env, owner, repo, pr_number): print(f"Database: Deleted timeline cache for {owner}/{repo}#{pr_number}") except Exception as e: print(f"Error deleting timeline from database for {owner}/{repo}#{pr_number}: {str(e)}") + + +async def get_all_flakiness_scores(db): + """Return a dict keyed by (repo, workflow_name, check_name, job_name). + + Returns an empty dict if the table does not exist yet (before the first + sync from the flakiness detector workflow). + """ + try: + result = await db.prepare( + 'SELECT repo, workflow_name, check_name, job_name, ' + 'flakiness_score, severity, classification ' + 'FROM flakiness_scores' + ).all() + rows = result.results.to_py() if hasattr(result.results, 'to_py') else list(result.results) + return { + (r['repo'], r['workflow_name'], r['check_name'], r['job_name']): dict(r) + for r in rows + } + except Exception as e: + print(f"Flakiness: Error loading all scores: {str(e)}") + return {} + + +async def get_flakiness_score(db, repo, workflow_name, check_name, job_name): + """Return a single flakiness_scores row as a dict, or None if not found.""" + try: + result = await db.prepare( + 'SELECT repo, workflow_name, check_name, job_name, ' + 'flakiness_score, severity, classification ' + 'FROM flakiness_scores ' + 'WHERE repo = ? AND workflow_name = ? AND check_name = ? AND job_name = ?' + ).bind(repo, workflow_name, check_name, job_name).first() + if not result: + return None + return result.to_py() if hasattr(result, 'to_py') else dict(result) + except Exception as e: + print( + "Flakiness: Error loading score for " + f"{repo}/{workflow_name}/{check_name}/{job_name}: {str(e)}" + ) + return None diff --git a/src/handlers.py b/src/handlers.py index 57df693c..c4480fee 100644 --- a/src/handlers.py +++ b/src/handlers.py @@ -18,7 +18,7 @@ _READINESS_CACHE_TTL, _RATE_LIMIT_CACHE_TTL, _READINESS_RATE_LIMIT, _READINESS_RATE_WINDOW, _rate_limit_cache ) -from database import get_db, upsert_pr +from database import get_db, upsert_pr, get_all_flakiness_scores from github_api import ( fetch_pr_data, fetch_pr_timeline_data, fetch_paginated_data, verify_github_signature, fetch_multiple_prs_batch, fetch_org_repos @@ -1584,8 +1584,14 @@ async def _run_readiness_analysis(env, pr, pr_id, github_token): # Classify review health review_classification, review_score = classify_review_health(review_data) + # Load flakiness scores from D1 (graceful fallback on missing table) + flakiness_scores = await get_all_flakiness_scores(get_db(env)) + # Calculate combined readiness - readiness = calculate_pr_readiness(pr, review_classification, review_score) + readiness = calculate_pr_readiness( + pr, review_classification, review_score, + flakiness_scores=flakiness_scores, + ) # Build response data with percentage formatting response_data = { diff --git a/src/utils.py b/src/utils.py index 92957473..0376e771 100644 --- a/src/utils.py +++ b/src/utils.py @@ -401,52 +401,68 @@ def classify_review_health(review_data): return (classification, score) -def calculate_ci_confidence(checks_passed, checks_failed, checks_skipped): +def calculate_ci_confidence(checks_passed, checks_failed, checks_skipped, + known_flaky_count=0): """ - Calculate CI confidence score from check results - + Calculate CI confidence score from check results. + Args: checks_passed: Number of passing checks checks_failed: Number of failing checks checks_skipped: Number of skipped checks - + known_flaky_count: Number of checks whose flakiness classification is + 'flaky' with severity 'low' or 'medium' (from flakiness_scores D1 + table). When failures are at or below this count the per-failure + penalty is reduced from 0.50 to 0.20, reflecting that they are + likely intermittent rather than real regressions. + Returns: int: Confidence score 0-100 """ total_checks = checks_passed + checks_failed + checks_skipped - + # No checks = neutral score if total_checks == 0: return 50 - - # All failed = 0 + + # All failed = 0 (even if some are known-flaky, this many failures is a signal) if checks_passed == 0 and checks_failed > 0: return 0 - + # All passed = 100 if checks_failed == 0 and checks_passed > 0: return 100 - - # Calculate based on pass rate, penalize failures more than skipped + pass_rate = checks_passed / total_checks - fail_rate = checks_failed / total_checks skip_rate = checks_skipped / total_checks - - # Weighted score: passes add, failures subtract (reduced for flaky test tolerance), skips slightly reduce - score = (pass_rate * 100) - (fail_rate * 50) - (skip_rate * 20) - + + # Apply a reduced penalty when failures are plausibly flaky: + # - failures <= known_flaky_count → 0.20 penalty weight (weakly penalised) + # - otherwise → 0.50 penalty weight (standard) + if known_flaky_count > 0 and checks_failed <= known_flaky_count: + fail_penalty = 0.20 + else: + fail_penalty = 0.50 + + fail_rate = checks_failed / total_checks + score = (pass_rate * 100) - (fail_rate * fail_penalty * 100) - (skip_rate * 20) + return max(0, min(100, int(score))) -def calculate_pr_readiness(pr_data, review_classification, review_score): +def calculate_pr_readiness(pr_data, review_classification, review_score, + flakiness_scores=None): """ - Calculate overall PR readiness combining CI and review health - + Calculate overall PR readiness combining CI and review health. + Args: pr_data: Dict with PR info including CI checks review_classification: str from classify_review_health review_score: int from classify_review_health - + flakiness_scores: Optional dict[(repo, workflow_name, check_name, job_name) -> row] from + get_all_flakiness_scores(). When provided, the CI confidence score + will use a reduced penalty for failures that are known-flaky. + Returns: Dict with: { @@ -460,11 +476,27 @@ def calculate_pr_readiness(pr_data, review_classification, review_score): 'recommendations': List[str] } """ + # Count how many low/medium-severity flaky checks are known for this repo. + # We cannot match by check name here because pr_data only carries aggregate + # counts, so we use the total as a conservative upper bound. + known_flaky_count = 0 + if flakiness_scores: + repo_owner = pr_data.get('repo_owner') + repo_name = pr_data.get('repo_name') + repo = f"{repo_owner}/{repo_name}" if repo_owner and repo_name else None + known_flaky_count = sum( + 1 for row in flakiness_scores.values() + if row.get('repo') == repo + and row.get('classification') == 'flaky' + and row.get('severity') in ('low', 'medium') + ) + # Calculate CI score ci_score = calculate_ci_confidence( pr_data.get('checks_passed', 0), pr_data.get('checks_failed', 0), - pr_data.get('checks_skipped', 0) + pr_data.get('checks_skipped', 0), + known_flaky_count=known_flaky_count, ) # Weighted combination: 45% CI, 55% Review (reduced CI weight due to flaky tests) @@ -509,11 +541,31 @@ def calculate_pr_readiness(pr_data, review_classification, review_score): checks_skipped = pr_data.get('checks_skipped', 0) if checks_failed > 2: - blockers.append(f"{checks_failed} CI check(s) failing") - recommendations.append("Fix failing CI checks before merging") + if known_flaky_count > 0 and checks_failed <= known_flaky_count: + warnings.append( + f"{checks_failed} CI check(s) failing — {known_flaky_count} known-flaky " + f"check(s) on record (reduced penalty applied)" + ) + recommendations.append( + "Review flakiness data; failures may be intermittent — see flakiness issues" + ) + else: + blockers.append(f"{checks_failed} CI check(s) failing") + recommendations.append("Fix failing CI checks before merging") elif checks_failed > 0: - warnings.append(f"{checks_failed} CI check(s) failing (possibly flaky tests)") - recommendations.append("Verify if failures are from known flaky tests (Selenium, Docker)") + if known_flaky_count > 0: + warnings.append( + f"{checks_failed} CI check(s) failing " + f"({known_flaky_count} known-flaky check(s) on record — likely intermittent)" + ) + recommendations.append( + "Check flakiness issues for this repo; failure may not be a regression" + ) + else: + warnings.append(f"{checks_failed} CI check(s) failing (possibly flaky tests)") + recommendations.append( + "Verify if failures are from known flaky tests (Selenium, Docker)" + ) if checks_skipped > 0: warnings.append(f"{checks_skipped} CI check(s) skipped")