|
| 1 | +import json |
| 2 | +import logging |
| 3 | +from typing import List |
| 4 | + |
| 5 | +class Vulnerability: |
| 6 | + """ |
| 7 | + Vulnerability is an object for marshalling vulnerability findings |
| 8 | + from Inspector's JSON into a Python object that can be queried and manipulated. |
| 9 | + """ |
| 10 | + def __init__(self): |
| 11 | + self.vuln_id = "null" |
| 12 | + self.severity = "null" |
| 13 | + self.description = "null" |
| 14 | + self.resource_type = "null" |
| 15 | + self.resource_id = "null" |
| 16 | + |
| 17 | +def get_json_value(key: str, inspector_scan_json: dict): |
| 18 | + value = inspector_scan_json.get(key) |
| 19 | + return value |
| 20 | + |
| 21 | +def get_json_value_or_throw_fatal_error(key: str, inspector_scan_json: dict): |
| 22 | + value = get_json_value(key, inspector_scan_json) |
| 23 | + if not value: |
| 24 | + logging.fatal(f"expected JSON with key '{key}' but it was not found") |
| 25 | + return value |
| 26 | + |
| 27 | +def get_inspector_scan_body(inspector_scan_json): |
| 28 | + scan_json = json.loads(inspector_scan_json) |
| 29 | + scan_body = get_json_value("findings", scan_json) |
| 30 | + if not scan_body: |
| 31 | + logging.fatal("expected JSON with key 'findings' but none was found") |
| 32 | + return scan_body |
| 33 | + |
| 34 | +def parse_vulns(inspector_scan_json: str) -> List[Vulnerability]: |
| 35 | + scan_body = get_inspector_scan_body(inspector_scan_json) |
| 36 | + vulnerabilities = [] |
| 37 | + |
| 38 | + for finding in scan_body: |
| 39 | + vuln = Vulnerability() |
| 40 | + vuln.vuln_id = finding.get("arn", "null") |
| 41 | + vuln.severity = finding.get("severity", "null") |
| 42 | + vuln.description = finding.get("description", "null") |
| 43 | + vuln.resource_type = finding.get("resource", {}).get("type", "null") |
| 44 | + vuln.resource_id = finding.get("resource", {}).get("id", "null") |
| 45 | + vulnerabilities.append(vuln) |
| 46 | + |
| 47 | + return vulnerabilities |
| 48 | + |
| 49 | +def generate_csv_report(vulnerabilities: List[Vulnerability]) -> str: |
| 50 | + csv_output = "ID,SEVERITY,DESCRIPTION,RESOURCE_TYPE,RESOURCE_ID\n" |
| 51 | + |
| 52 | + for vuln in vulnerabilities: |
| 53 | + clean_description = vuln.description.replace(',', '') |
| 54 | + csv_row = f"{vuln.vuln_id},{vuln.severity},{clean_description},{vuln.resource_type},{vuln.resource_id}\n" |
| 55 | + csv_output += csv_row |
| 56 | + |
| 57 | + return csv_output |
| 58 | + |
| 59 | +def write_csv_report(inspector_scan_path: str, dst_file: str) -> bool: |
| 60 | + with open(inspector_scan_path, 'r') as file: |
| 61 | + inspector_scan_json = file.read() |
| 62 | + |
| 63 | + vulnerabilities = parse_vulns(inspector_scan_json) |
| 64 | + if not vulnerabilities: |
| 65 | + logging.info("No vulnerabilities found, skipping CSV report") |
| 66 | + return False |
| 67 | + |
| 68 | + csv_output = generate_csv_report(vulnerabilities) |
| 69 | + logging.info(f"Writing vulnerability CSV report to: {dst_file}") |
| 70 | + with open(dst_file, "w") as file: |
| 71 | + file.write(csv_output) |
| 72 | + |
| 73 | + return True |
0 commit comments