|
| 1 | +import base64 |
| 2 | +import json |
| 3 | +import pytest |
| 4 | +from fastapi.testclient import TestClient |
| 5 | +from prometheus_client import REGISTRY |
| 6 | + |
| 7 | +from src.main import app |
| 8 | +from src.config import get_settings |
| 9 | + |
| 10 | +@pytest.fixture |
| 11 | +def client(monkeypatch): |
| 12 | + """Fixture to provide a TestClient with a fixed signing key and cleared settings cache.""" |
| 13 | + # Requirement: All tests must set QR_SIGNING_KEY to a 32-character test string |
| 14 | + monkeypatch.setenv("QR_SIGNING_KEY", "q" * 32) |
| 15 | + get_settings.cache_clear() |
| 16 | + return TestClient(app) |
| 17 | + |
| 18 | +def test_qr_generate_validate_audit_flow(client): |
| 19 | + """ |
| 20 | + End-to-end test for the QR lifecycle: |
| 21 | + 1. Generate a signed QR token. |
| 22 | + 2. Validate the token successfully. |
| 23 | + 3. Validate a tampered token (invalid signature). |
| 24 | + 4. Validate a token with extra fields (signature mismatch). |
| 25 | + 5. Verify the audit log contains valid and invalid entries. |
| 26 | + 6. Verify Prometheus metrics. |
| 27 | + """ |
| 28 | + ticket_id = "E2E-TEST-001" |
| 29 | + event_name = "E2E-Festival" |
| 30 | + |
| 31 | + # --- Step 1: Generate QR --- |
| 32 | + gen_payload = { |
| 33 | + "ticket_id": ticket_id, |
| 34 | + "event": event_name, |
| 35 | + "user": "tester@example.com" |
| 36 | + } |
| 37 | + resp = client.post("/generate-qr", json=gen_payload) |
| 38 | + |
| 39 | + # Assertions 1 & 2: Status 200, valid PNG, and token presence |
| 40 | + assert resp.status_code == 200 |
| 41 | + data = resp.json() |
| 42 | + assert "qr_base64" in data |
| 43 | + assert "token" in data # Extracted signed token requirement |
| 44 | + |
| 45 | + qr_content = base64.b64decode(data["qr_base64"]) |
| 46 | + assert qr_content.startswith(b"\x89PNG"), "QR code must be a PNG image" |
| 47 | + |
| 48 | + token_str = data["token"] |
| 49 | + token_obj = json.loads(token_str) |
| 50 | + |
| 51 | + # Store initial metrics |
| 52 | + def get_metric(res): |
| 53 | + return REGISTRY.get_sample_value("qr_validations_total", {"result": res}) or 0 |
| 54 | + |
| 55 | + m_valid_start = get_metric("valid") |
| 56 | + m_invalid_start = get_metric("invalid") |
| 57 | + |
| 58 | + # --- Step 2: Validate (Successful) --- |
| 59 | + val_resp = client.post("/validate-qr", json={"qr_text": token_str}) |
| 60 | + |
| 61 | + # Assertion 3: Valid scan returns True and correct metadata |
| 62 | + assert val_resp.status_code == 200 |
| 63 | + val_data = val_resp.json() |
| 64 | + assert val_data["isValid"] is True |
| 65 | + assert val_data["metadata"]["ticket_id"] == ticket_id |
| 66 | + assert val_data["metadata"]["event"] == event_name |
| 67 | + |
| 68 | + # --- Step 3: Validate (Tampered signature) --- |
| 69 | + tampered_token = token_obj.copy() |
| 70 | + tampered_token["sig"] = "invalid_signature_string" |
| 71 | + resp_tampered = client.post("/validate-qr", json={"qr_text": json.dumps(tampered_token)}) |
| 72 | + |
| 73 | + # Assertion 4: Tampered signature returns False |
| 74 | + assert resp_tampered.status_code == 200 |
| 75 | + assert resp_tampered.json()["isValid"] is False |
| 76 | + |
| 77 | + # --- Step 4: Validate (Extra field / Tampered payload) --- |
| 78 | + extra_field_token = token_obj.copy() |
| 79 | + extra_field_token["fraud"] = "injected" |
| 80 | + resp_extra = client.post("/validate-qr", json={"qr_text": json.dumps(extra_field_token)}) |
| 81 | + |
| 82 | + # Assertion 5: Extra field (tampering) returns False |
| 83 | + assert resp_extra.status_code == 200 |
| 84 | + assert resp_extra.json()["isValid"] is False |
| 85 | + |
| 86 | + # --- Step 5: Audit Log --- |
| 87 | + log_resp = client.get(f"/qr/scan-log/{ticket_id}") |
| 88 | + |
| 89 | + # Assertion 6: Audit log contains valid and invalid entries |
| 90 | + assert log_resp.status_code == 200 |
| 91 | + logs = log_resp.json() |
| 92 | + assert len(logs) >= 2, "Should have at least one valid and one invalid log entry" |
| 93 | + |
| 94 | + has_valid = any(l["is_valid"] is True for l in logs) |
| 95 | + has_invalid = any(l["is_valid"] is False for l in logs) |
| 96 | + assert has_valid and has_invalid, "Audit log must contain both valid and invalid attempts" |
| 97 | + |
| 98 | + # --- Step 6: Prometheus Metrics --- |
| 99 | + # Assertion 7: Valid scan incremented counter |
| 100 | + assert get_metric("valid") == m_valid_start + 1 |
| 101 | + |
| 102 | + # Assertion 8: Invalid scans incremented counter (2 invalid attempts in steps 3-4) |
| 103 | + assert get_metric("invalid") >= m_invalid_start + 2 |
0 commit comments