diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..b38df29 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,6 @@ +version: 2 +updates: + - package-ecosystem: "pip" + directory: "/" + schedule: + interval: "daily" diff --git a/README.md b/README.md index 1dde827..29fa548 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,8 @@ To check the options available in the CLI, you can run the following command: * **--disable-apis**: Disable API requests to get external information +* **--verbose**: Verbose mode + ### Recommendations * Previous project build/compilation @@ -83,7 +85,7 @@ To check the options available in the CLI, you can run the following command: ## Configuration -TO DO +The Fafnir configuration file, defined with the `--configuration` flag, may be defined with a YAML structure. You can see a full example of this file in [this repository](configuration_file/fafnir_config.yml). You can download it, fill it and then use it in your future scans. ## Security tools @@ -120,6 +122,28 @@ To know the supported languages and technologies, review the integrated tools. ## Supported languages and technologies +|Language|Supported SAST|Supported extensions (SAST)|Supported SCA|Supported SCA files| +|----|--------|------|---------|-----------| +|Python|:white_check_mark:|.py / .pyc|:white_check_mark:|Pipfile.lock / poetry.lock / requirements.txt / .egg / .dist-info/META-DATA / envs/*/conda-meta/*.json| +|Java|:white_check_mark:|.java / .jar / .war / .ear|:white_check_mark:|| +|JS & TS|:white_check_mark:|.js / .ts|:white_check_mark:|| +|Go|:white_check_mark:|.go|:white_check_mark:|| +|C / C++ / C#|:white_check_mark:|.c / .cpp / .cs|:white_check_mark:|| +|Kotlin|:white_check_mark:|.kt|:white_check_mark:|| +|PHP|:white_check_mark:|.php|:white_check_mark:|| +|Ruby on rails|:x:||:x:|| +|Lua|:x:||:x:|| +|Batch file|:x:||:x:|| +|Powershell script|:x:||:x:|| + +|Technologies|Supported|Supported extensions| +|--------|-------|------| +|Secrets|:white_check_mark:|All files| +|IaC|:white_check_mark:|.tf| +|Containers|:white_check_mark:|Dockerfile / Local builded images| +|SBOM generator|:white_check_mark:|Python / Java / JS / Go / C / C++ / C# / Kotlin / PHP| +|DAST|:x:|URL| + ## FAQ ### PermissionError due to Docker container run diff --git a/configuration_file/fafnir_config.yml b/configuration_file/fafnir_config.yml index 63f0d33..115da41 100644 --- a/configuration_file/fafnir_config.yml +++ b/configuration_file/fafnir_config.yml @@ -1,4 +1,4 @@ -exclude-tools: # Uncomment the tools you want to exclude from analysis +#exclude-tools: # Uncomment the tools you want to exclude from analysis #- semgrep #- bandit #- find-sec-bugs diff --git a/src/config/config.yml b/src/config/config.yml index 2b9de57..2e956c6 100644 --- a/src/config/config.yml +++ b/src/config/config.yml @@ -57,7 +57,7 @@ technologies: extensions: ['.js'] tools: ['trivy-sca','osv-scanner','syft','semgrep','gitleaks'] typescript: - extensions: ['.js'] + extensions: ['.ts'] tools: ['trivy-sca','osv-scanner','syft','semgrep','gitleaks'] go: extensions: ['.go'] diff --git a/src/core/technology_detection/detect_technologies.py b/src/core/technology_detection/detect_technologies.py index 8d0d981..900ba32 100644 --- a/src/core/technology_detection/detect_technologies.py +++ b/src/core/technology_detection/detect_technologies.py @@ -1,41 +1,58 @@ import os -#from guesslang import Guess - -def _detect_technologies (code_path): - technologies = [] - for root,_,f_names in os.walk(code_path): - for f in f_names: - tech = _guess_programming_language_from_extension(os.path.join(root, f)) - if tech is not None: - technologies.append(tech) - return list(dict.fromkeys(technologies)) - -def _guess_programming_language_from_extension (filepath): +from typing import List, Dict + +def _detect_technologies(code_path: str) -> List[str]: + """ + Detects the programming technologies used in the given code path. + + Args: + code_path: The path to the directory containing the code. + + Returns: + A list of programming technologies used in the code. + """ + return list(set(_guess_programming_language_from_extension(os.path.join(root, f)) + for root, _, f_names in os.walk(code_path) + for f in f_names + if _guess_programming_language_from_extension(os.path.join(root, f)))) + +def _guess_programming_language_from_extension(filepath: str) -> str: + """ + Guesses the programming language based on the file extension of the given file path. + + Parameters: + filepath (str): The path of the file. + + Returns: + str: The file extension indicating the programming language. + """ _, file_extension = os.path.splitext(filepath) return file_extension -def select_tools(scan_fullpath, config, fafnir_configuration): +def select_tools(scan_fullpath: str, config: Dict[str, dict], fafnir_configuration: Dict[str, list]) -> List[str]: + """ + Generates a list of tools based on the detected technologies in the given scan_fullpath. - list_tools = [] + Parameters: + - scan_fullpath (str): The full path of the scan. + - config (Dict[str, dict]): The configuration dictionary. + - fafnir_configuration (Dict[str, list]): The fafnir configuration dictionary. - technologies = _detect_technologies(scan_fullpath) + Returns: + - list[str]: A list of tools based on the detected technologies, excluding any tools specified in the fafnir configuration. - exclude_tools = [] - if fafnir_configuration.get('exclude-tools') is not None: - exclude_tools = fafnir_configuration.get('exclude-tools') + """ + technologies = _detect_technologies(scan_fullpath) + exclude_tools = fafnir_configuration.get('exclude-tools', []) - for tech in list(config.get('technologies').keys()): - supported_technologies = config.get('technologies').get(tech).get('extensions') - for code_technology in technologies: - if code_technology in supported_technologies: - list_tools.extend(x for x in config.get('technologies').get(tech).get('tools') if x not in list_tools and x not in exclude_tools) + list_tools = [ + tool + for tech in config.get('technologies').keys() + for code_technology in technologies + if code_technology in config.get('technologies').get(tech).get('extensions') + for tool in config.get('technologies').get(tech).get('tools') + if tool not in exclude_tools + ] return list_tools - -# Deprecated: Not so eficient -#def _guess_programming_language_from_file (filepath): -# with open(filepath, 'r') as file: -# file_content = file.read() -# guess = Guess() -# return guess.language_name(file_content) \ No newline at end of file diff --git a/src/main.py b/src/main.py index 5a6ad26..cbdfefd 100644 --- a/src/main.py +++ b/src/main.py @@ -1,6 +1,8 @@ import yaml import docker import click +import os +from typing import Optional from core.run_containers import run_tools from config.banner.banner import print_banner @@ -10,16 +12,29 @@ VERSION = '1.0.0' -#volume = "/home/julian/workspace/fafnir/src" - @click.command() @click.argument('scan_fullpath') @click.option("--verbose", is_flag=True, show_default=True, default=False, help="Verbose mode") @click.option("--configuration", help="Fafnir configuration file") -@click.option("--asynchronous", is_flag=True, show_default=True, default=False, help="Asynchronous mode (run multiple containers at same time)") -@click.option("--output-path", default=".", help="Path to store the tools/Fafnir report") +@click.option("--asynchronous", is_flag=True, show_default=True, default=False, help="Asynchronous mode") +@click.option("--output-path", default=os.path.join(os.path.abspath("."),"reports"), help="Path to store the tools/Fafnir report") @click.option("--disable-apis", is_flag=True, show_default=True, default=True, help="Disable API requests") -def main(scan_fullpath, verbose, configuration, asynchronous, output_path, disable_apis): +def main(scan_fullpath: str, verbose: bool, configuration: Optional[str], + asynchronous: bool, output_path: str, disable_apis: bool) -> None: + """ + Run the main function of the program. + + Args: + scan_fullpath (str): The full path of the scan. + verbose (bool): Flag indicating whether to run in verbose mode. + configuration (str): The file path of the Fafnir configuration file. + asynchronous (bool): Flag indicating whether to run in asynchronous mode. + output_path (str): The path to store the Fafnir report. + disable_apis (bool): Flag indicating whether to disable API requests. + + Returns: + None + """ print_banner(VERSION) @@ -27,10 +42,7 @@ def main(scan_fullpath, verbose, configuration, asynchronous, output_path, disab config = yaml.safe_load(open("src/config/config.yml")) - if configuration: - fafnir_config = yaml.safe_load(open(configuration)) - else: - fafnir_config = None + fafnir_config = yaml.safe_load(open(configuration)) if configuration else None tools = select_tools(scan_fullpath, config, fafnir_config) diff --git a/src/report/data_schema/dependency.py b/src/report/data_schema/dependency.py index 0d77241..3abbf87 100644 --- a/src/report/data_schema/dependency.py +++ b/src/report/data_schema/dependency.py @@ -1,11 +1,163 @@ class Dependency: - def __init__(self, name, version, location, package_manager, language, licenses, purl): + def __init__(self) -> None: + pass + + ''' + def __init__(self, name:str, version:str, location:str, package_manager:str, language, licenses, purl): self.name = name self.version = version self.location = location self.package_manager = package_manager self.language = language self.licenses = licenses + self.purl = purl + ''' + + def get_name(self): + """ + Get the name attribute. + + Returns: + str: The name attribute. + """ + return self.name + + def set_name(self, name): + """ + Set the name attribute. + + Parameters: + name (str): The new name attribute. + + Returns: + None + """ + self.name = name + + def get_version(self): + """ + Get the version attribute. + + Returns: + str: The version attribute. + """ + return self.version + + def set_version(self, version): + """ + Set the version attribute. + + Parameters: + version (str): The new version attribute. + + Returns: + None + """ + self.version = version + + def get_location(self): + """ + Get the location attribute. + + Returns: + str: The location attribute. + """ + return self.location + + def set_location(self, location): + """ + Set the location attribute. + + Parameters: + location (str): The new location attribute. + + Returns: + None + """ + self.location = location + + def get_package_manager(self): + """ + Get the package_manager attribute. + + Returns: + str: The package_manager attribute. + """ + return self.package_manager + + def set_package_manager(self, package_manager): + """ + Set the package_manager attribute. + + Parameters: + package_manager (str): The new package_manager attribute. + + Returns: + None + """ + self.package_manager = package_manager + + def get_language(self): + """ + Get the language attribute. + + Returns: + str: The language attribute. + """ + return self.language + + def set_language(self, language): + """ + Set the language attribute. + + Parameters: + language (str): The new language attribute. + + Returns: + None + """ + self.language = language + + def get_licenses(self): + """ + Get the licenses attribute. + + Returns: + list: The licenses attribute. + """ + return self.licenses + + def set_licenses(self, licenses): + """ + Set the licenses attribute. + + Parameters: + licenses (list): The new licenses attribute. + + Returns: + None + """ + self.licenses = licenses + + def get_purl(self): + """ + Get the purl attribute. + + Returns: + str: The purl attribute. + """ + return self.purl + + def set_purl(self, purl): + """ + Set the purl attribute. + + Parameters: + purl (str): The new purl attribute. + + Returns: + None + """ self.purl = purl \ No newline at end of file diff --git a/src/report/data_schema/vulnerability.py b/src/report/data_schema/vulnerability.py index d58a17b..901d75b 100644 --- a/src/report/data_schema/vulnerability.py +++ b/src/report/data_schema/vulnerability.py @@ -1,7 +1,18 @@ class Vulnerability: - def __init__(self, name, description, identifier, severity, cvss, epss, category, rule, file, location, fix, link, tools): + def __init__(self): + """ + Initializes a new instance of the class. + Parameters: + None + Returns: + None + """ + pass + + ''' + def __init__(self, name: str, description:str, identifier:str, severity:str, cvss:float, epss:float, category:str, rule:str, file:str, location:str, fix: str, link:str, tools:list): self.name = name self.description = description self.identifier = identifier @@ -15,3 +26,270 @@ def __init__(self, name, description, identifier, severity, cvss, epss, category self.fix = fix self.link = link self.tools = tools + ''' + + def get_name(self): + """ + Get the name of the instance. + + Returns: + str: The name of the instance. + """ + return self.name + + def get_description(self): + """ + Returns the description of the object. + + :return: A string representing the description of the object. + """ + return self.description + + def get_identifier(self): + """ + Get the identifier of the object. + :return: The identifier of the object. + """ + return self.identifier + + def get_severity(self): + """ + Get the severity of the object. + Returns: + The severity of the object. + """ + return self.severity + + def get_cvss(self): + """ + Returns the value of the 'cvss' attribute. + + Returns: + The value of the 'cvss' attribute. + """ + return self.cvss + + def get_epss(self): + """ + Get the value of the `epss` attribute. + + Returns: + The value of the `epss` attribute. + """ + return self.epss + + def get_category(self): + """ + Returns the category of the object. + + :return: The category of the object. + """ + return self.category + + def get_rule(self): + """ + Get the rule associated with this object. + + Returns: + The rule associated with this object. + """ + return self.rule + + def get_file(self): + """ + Get the file associated with this object. + + :return: The file object. + """ + return self.file + + def get_location(self): + """ + Get the location. + + Returns: + The location of the object. + """ + return self.location + + def get_fix(self): + """ + A function that returns the value of the `fix` attribute. + + Returns: + fix (any): The value of the `fix` attribute. + """ + return self.fix + + def get_link(self): + """ + Get the link attribute of the current object. + + :return: The link attribute. + """ + return self.link + + def get_tools(self): + """ + This function returns the tools of the object. + + :return: The tools of the object. + """ + return self.tools + + # Setter methods + def set_name(self, name): + """ + Sets the name of the object. + + Parameters: + name (str): The name to set for the object. + + Returns: + None + """ + self.name = name + + def set_description(self, description): + """ + Set the description of the object. + + Parameters: + description (str): The description to set. + + Returns: + None + """ + self.description = description + + def set_identifier(self, identifier): + """ + Set the identifier of the object. + + Parameters: + identifier (str): The identifier to set. + + Returns: + None + """ + self.identifier = identifier + + def set_severity(self, severity): + """ + Set the severity of the object. + + Parameters: + severity (str): The severity to set. + + Returns: + None + """ + self.severity = severity + + def set_cvss(self, cvss): + """ + Set the CVSS score for the object. + + Parameters: + cvss (float): The CVSS score to set. + + Returns: + None + """ + self.cvss = cvss + + def set_epss(self, epss): + """ + Set the value of `epss`. + + Parameters: + epss (float): The new value for `epss`. + + Returns: + None + """ + self.epss = epss + + def set_category(self, category): + """ + Sets the category of the object. + + Parameters: + category (str): The category to set for the object. + + Returns: + None + """ + self.category = category + + def set_rule(self, rule): + """ + Set the rule for the object. + + Args: + rule (type): The rule to be set. + + Returns: + None + """ + self.rule = rule + + def set_file(self, file): + """ + Set the file attribute of the object. + + Parameters: + file (str): The file to set. + + Returns: + None + """ + self.file = file + + def set_location(self, location): + """ + Set the location of the object. + + Parameters: + location (str): The new location of the object. + + Returns: + None + """ + self.location = location + + def set_fix(self, fix): + """ + Set the value of the 'fix' attribute. + + Args: + fix (str): The new value for the 'fix' attribute. + + Returns: + None + """ + self.fix = fix + + def set_link(self, link): + """ + Set a link to the vulnerability. + + Args: + link (str): The new value for the 'link' attribute. + + Returns: + None + """ + self.link = link + + def set_tools(self, tools): + """ + Set the tools attribute of the class instance. + + Parameters: + tools (list): The list of tools to be set as the value of the tools attribute. + + Returns: + None + """ + self.tools = tools diff --git a/src/report/parsers/bandit_parser.py b/src/report/parsers/bandit_parser.py index 9aae49d..e36ee04 100644 --- a/src/report/parsers/bandit_parser.py +++ b/src/report/parsers/bandit_parser.py @@ -1,12 +1,39 @@ +from typing import List, Dict + from report.data_schema.vulnerability import Vulnerability CATEGORY = "Static Application Security Testing (SAST)" TOOL_NAME = "Bandit" -def parse_bandit_vulns(report: dict): - vulnerabilities = [] - for result in report.get('results'): - vulnerabilities.append(Vulnerability(result.get('issue_text'), result.get('issue_text') + ": " + result.get('filename'),"",result.get('issue_severity'), "N/A", "N/A", - CATEGORY, result.get('test_id'), result.get('filename'), - result.get('line_number'), "Not defined by the tool", result.get('more_info'),[TOOL_NAME])) - return vulnerabilities \ No newline at end of file +def parse_bandit_vulns(report: Dict[str, List[Dict[str, str]]]) -> List[Vulnerability]: + """ + Parse the bandit vulnerabilities from the given report. + + Args: + report (dict): The report containing the vulnerabilities. + + Returns: + List[Vulnerability]: A list of Vulnerability objects representing the parsed vulnerabilities. + """ + vulnerabilities: List[Vulnerability] = [] + for result in report.get('results',[]): + try: + vulnerability = Vulnerability() + vulnerability.set_name(result.get('issue_text')) + vulnerability.set_description(result.get('issue_text') + ": " + result.get('filename')) + vulnerability.set_severity(result.get('issue_severity')) + vulnerability.set_identifier('') + vulnerability.set_cvss(None) + vulnerability.set_epss(None) + vulnerability.set_category(CATEGORY) + vulnerability.set_rule(result.get('test_id')) + vulnerability.set_file(result.get('filename')) + vulnerability.set_location(result.get('line_number')) + vulnerability.set_fix("Not defined by the tool") + vulnerability.set_link(result.get('more_info')) + vulnerability.set_tools([TOOL_NAME]) + vulnerabilities.append(vulnerability) + except Exception as e: + print(f"Error parsing vulnerability: {e}") + + return vulnerabilities diff --git a/src/report/parsers/checkov_parser.py b/src/report/parsers/checkov_parser.py index 5c05e28..eaf3e32 100644 --- a/src/report/parsers/checkov_parser.py +++ b/src/report/parsers/checkov_parser.py @@ -1,22 +1,55 @@ from report.data_schema.vulnerability import Vulnerability +from typing import Dict, List TOOL_NAME = "Checkov" def _get_category(category): - if category == "terraform": - return "IaC code analysis" - elif category == "dockerfile": - return "Container analysis" - elif category == "secrets": - return "Secrets detection" - else: - return "" + """ + Retrieves the category description based on the given category. -def parse_checkov_vulns(report: dict): - vulnerabilities = [] - for check_type in report: - for vuln in check_type.get('results').get('failed_checks'): - vulnerabilities.append(Vulnerability(vuln.get('check_name'), vuln.get('check_id') + ": " + vuln.get('check_name'),vuln.get('check_id'),vuln.get('severity'),'N/A', "N/A", - _get_category(check_type.get('check_type')), vuln.get('check_id'), vuln.get('file_path'), - vuln.get('resource'),vuln.get('check_name'),vuln.get('guideline'),[TOOL_NAME])) - return vulnerabilities \ No newline at end of file + Parameters: + category (str): The category for which the description is to be retrieved. + + Returns: + str: The description of the category. If the category is not found, an empty string is returned. + """ + categories = { + "terraform": "IaC code analysis", + "dockerfile": "Container analysis", + "secrets": "Secrets detection", + } + return categories.get(category, "") + +def parse_checkov_vulns(report: Dict[str, Dict[str, Dict[str, List[Dict[str, str]]]]]) -> List[Vulnerability]: + """ + Parses the Checkov vulnerabilities report and returns a list of Vulnerability objects. + + Parameters: + report (Dict[str, Dict[str, Dict[str, List[Dict[str, str]]]]]): The Checkov vulnerabilities report as a dictionary. + + Returns: + List[Vulnerability]: A list of Vulnerability objects representing the parsed vulnerabilities. + """ + vulnerabilities: List[Vulnerability] = [] + for check_type, check_type_results in report.items(): + if 'results' in check_type_results and 'failed_checks' in check_type_results['results']: + for vuln in check_type_results['results']['failed_checks']: + try: + vulnerability = Vulnerability() + vulnerability.set_name(vuln['check_name']) + vulnerability.set_description(f"{vuln['check_id']}: {vuln['check_name']}") + vulnerability.set_identifier(vuln['check_id']) + vulnerability.set_severity(vuln['severity']) + vulnerability.set_cvss(None) + vulnerability.set_epss(None) + vulnerability.set_category(_get_category(check_type)) + vulnerability.set_rule(vuln['check_id']) + vulnerability.set_file(vuln['file_path']) + vulnerability.set_location(vuln['resource']) + vulnerability.set_fix(vuln['check_name']) + vulnerability.set_link(vuln['guideline']) + vulnerability.set_tools([TOOL_NAME]) + vulnerabilities.append(vulnerability) + except KeyError: + continue + return vulnerabilities diff --git a/src/report/parsers/findsecbugs_parser.py b/src/report/parsers/findsecbugs_parser.py index 938b23b..cbd5a2a 100644 --- a/src/report/parsers/findsecbugs_parser.py +++ b/src/report/parsers/findsecbugs_parser.py @@ -1,23 +1,96 @@ +from typing import List, Dict + from report.data_schema.vulnerability import Vulnerability CATEGORY = "Static Application Security Testing (SAST)" TOOL_NAME = "FindSecBugs" +FIELDS_NOT_DEFINED = "Not defined by the tool" def _get_findsecbugs_severity(level): - if level == "warning": - return "HIGH" - elif level == "error": - return "CRITICAL" - else: - return level - -def parse_findsecbugs_vulns(report: dict): - vulnerabilities = [] + """ + Get the severity level for FindSecBugs. + + :param level: The level of severity for FindSecBugs. + :return: The corresponding severity level for FindSecBugs. + """ + return { + "warning": "HIGH", + "error": "CRITICAL" + }.get(level, level) + +def get_physical_location(item): + """ + Retrieve the physical location of an item. + + Parameters: + item (dict): A dictionary representing an item. + + Returns: + str or None: The URI of the artifact location if it exists, None otherwise. + """ + physical_location = item.get('physicalLocation') + if physical_location is not None: + artifact_location = physical_location.get('artifactLocation') + if artifact_location is not None: + return artifact_location.get('uri') + return None + +def get_region_start_line(item): + """ + Get the start line of the region from the given item. + + Args: + item (dict): The item containing the physical location. + + Returns: + int or None: The start line of the region, or None if not found. + """ + physical_location = item.get('physicalLocation') + if physical_location is not None: + region = physical_location.get('region') + if region is not None: + return region.get('startLine') + return None + +def parse_findsecbugs_vulns(report: Dict[str, List[Dict[str, object]]]) -> List[Vulnerability]: + """ + Parses the FindSecBugs vulnerabilities from the given report. + + Args: + report (Dict[str, List[Dict[str, object]]]): The report containing the FindSecBugs vulnerabilities. + + Returns: + List[Vulnerability]: A list of Vulnerability objects representing the parsed vulnerabilities. + """ + + vulnerabilities: List[Vulnerability] = [] for run in report.get('runs'): - for result in run.get('results'): - files = [item.get('physicalLocation').get('artifactLocation').get('uri') for item in result.get('locations') if item.get('physicalLocation') is not None and item.get('physicalLocation').get('artifactLocation') is not None] - locations = [item.get('physicalLocation').get('region').get('startLine') for item in result.get('locations') if item.get('physicalLocation') is not None and item.get('physicalLocation').get('region') is not None] - vulnerabilities.append(Vulnerability(result.get('message').get('text'), result.get('ruleId') + ": " + result.get('message').get('text'),"Not defined by the tool", - _get_findsecbugs_severity(result.get('level')), "N/A", "N/A", CATEGORY, result.get('ruleId'), files[0] if files else "Not detected", - locations[0] if locations else 0, "Not defined by the tool", "Not defined by the tool",[TOOL_NAME])) - return vulnerabilities \ No newline at end of file + for result in run.get('results',[]): + try: + files = [get_physical_location(item) for item in result.get('locations') if get_physical_location(item) is not None] + locations = [get_region_start_line(item) for item in result.get('locations') if get_region_start_line(item) is not None] + message = result.get('message').get('text') + rule_id = result.get('ruleId') + severity = _get_findsecbugs_severity(result.get('level')) + file = files[0] if files else "Not detected" + location = locations[0] if locations else 0 + fix = FIELDS_NOT_DEFINED + link = FIELDS_NOT_DEFINED + vulnerability = Vulnerability() + vulnerability.set_name(message) + vulnerability.set_description(rule_id + ": " + message) + vulnerability.set_identifier(rule_id) + vulnerability.set_severity(severity) + vulnerability.set_cvss(None) + vulnerability.set_epss(None) + vulnerability.set_category(CATEGORY) + vulnerability.set_rule(rule_id) + vulnerability.set_file(file) + vulnerability.set_location(location) + vulnerability.set_fix(fix) + vulnerability.set_link(link) + vulnerability.set_tools([TOOL_NAME]) + vulnerabilities.append(vulnerability) + except Exception as e: + print(f"Error parsing vulnerability: {e}") + return vulnerabilities diff --git a/src/report/parsers/gitleaks_parser.py b/src/report/parsers/gitleaks_parser.py index 370b8db..ef456d1 100644 --- a/src/report/parsers/gitleaks_parser.py +++ b/src/report/parsers/gitleaks_parser.py @@ -1,14 +1,39 @@ +from typing import Dict, List + from report.data_schema.vulnerability import Vulnerability CATEGORY = "Secrets detection" TOOL_NAME = "GitLeaks" -def parse_gitleaks_vulns(report: dict): - vulnerabilities = [] +def parse_gitleaks_vulns(report: List[Dict[str, str]]) -> List[Vulnerability]: + """ + Parses the gitleaks vulnerabilities report and creates a list of Vulnerability objects. + + Args: + report (List[Dict[str, str]]): The gitleaks vulnerabilities report as a list of dictionaries. + + Returns: + List[Vulnerability]: A list of Vulnerability objects representing the parsed vulnerabilities. + """ + vulnerabilities: List[Vulnerability] = [] for result in report: - name = 'Secret detected in ' + result.get('File') - vulnerabilities.append(Vulnerability(name, result.get('Description'),"CWE-798","High", "N/A", "N/A", - CATEGORY, result.get('RuleID'), result.get('File'), - result.get('EndLine'),"Remove and rotate the hardcoded secret", - "https://cwe.mitre.org/data/definitions/798.html",[TOOL_NAME])) - return vulnerabilities \ No newline at end of file + try: + name = 'Secret detected in ' + result.get('File') + vulnerability = Vulnerability() + vulnerability.set_name(name) + vulnerability.set_description(result.get('Description')) + vulnerability.set_identifier("CWE-798") + vulnerability.set_severity("High") + vulnerability.set_cvss(None) + vulnerability.set_epss(None) + vulnerability.set_category(CATEGORY) + vulnerability.set_rule(result.get('RuleID')) + vulnerability.set_file(result.get('File')) + vulnerability.set_location(result.get('EndLine')) + vulnerability.set_fix("Remove and rotate the hardcoded secret") + vulnerability.set_link("https://cwe.mitre.org/data/definitions/798.html") + vulnerability.set_tools([TOOL_NAME]) + vulnerabilities.append(vulnerability) + except Exception as e: + print(f"Error parsing vulnerability: {e}") + return vulnerabilities diff --git a/src/report/parsers/osv_scanner_parser.py b/src/report/parsers/osv_scanner_parser.py index 8d94b22..ee2694f 100644 --- a/src/report/parsers/osv_scanner_parser.py +++ b/src/report/parsers/osv_scanner_parser.py @@ -1,39 +1,86 @@ +from typing import List, Dict + from report.data_schema.vulnerability import Vulnerability CATEGORY = "Software Compose Analysis (SCA)" TOOL_NAME = "OSV Scanner" def _get_real_severity(severity): - if severity == "MODERATE" or severity == "HIGH": - return "High" - else: - return severity + """ + Return the real severity based on the input severity. + + Parameters: + severity (str): The input severity value. + + Returns: + str: The real severity value. + """ + return "High" if severity in ("MODERATE", "HIGH") else severity def _get_fix_version(affected_versions): + """ + Returns a list of fix versions based on the provided affected versions. + + Parameters: + affected_versions (list): A list of affected versions. + + Returns: + list: A list of fix versions. + """ fix_version = [] for version in affected_versions: - for range in version.get('ranges'): - for event in range.get('events'): - if event.get('fixed') and event.get('fixed') not in fix_version: - fix_version.append(event.get('fixed')) + for range in version['ranges']: + for event in range['events']: + fixed = event.get('fixed') + if fixed and fixed not in fix_version: + fix_version.append(fixed) return fix_version -def parse_osv_scanner_vulns(report: dict): +def parse_osv_scanner_vulns(report: Dict[str, List[Dict[str, List[Dict[str, str]]]]]) -> List[Vulnerability]: + """ + Parse the vulnerabilities from the OSV scanner report. + + Args: + report (dict): The OSV scanner report. + + Returns: + list: A list of Vulnerability objects. + """ vulnerabilities = [] - if report.get('results'): - for result in report.get('results'): - for package in result.get('packages'): - for vuln in package.get('vulnerabilities'): - vuln_id = vuln.get('id') - if vuln.get('aliases'): - for v in vuln.get('aliases'): - if v.startswith('CVE-'): - vuln_id = v - fix_version = _get_fix_version(vuln.get('affected')) - vuln_severity = _get_real_severity(vuln.get('database_specific').get('severity')) - name = vuln_id + " (" + package.get('package').get('name') + "): " + vuln.get('summary') - vulnerabilities.append(Vulnerability(name, vuln.get('details'),vuln_id, vuln_severity, vuln.get('severity')[0].get('score'), "N/A", - CATEGORY, "N/A", result.get('source').get('path'), - package.get('package').get('name') + "@" + package.get('package').get('version'), - fix_version,vuln.get('references')[0].get('url'),[TOOL_NAME])) - return vulnerabilities \ No newline at end of file + results = report.get('results',[]) + if results: + vulns = [vuln + for result in results + for package in result.get('packages') + for vuln in package.get('vulnerabilities')] + + for vuln in vulns: + vuln_id = next((alias for alias in vuln.get('aliases') if alias.startswith('CVE-')), vuln.get('id')) + affected = vuln.get('affected') + fix_version = _get_fix_version(affected) + db_specific = vuln.get('database_specific') + vuln_severity = _get_real_severity(db_specific.get('severity')) + package_name = vuln.get('package', {}).get('name') + summary = vuln.get('summary') + details = vuln.get('details') + references = vuln.get('references') + severity = vuln['severity'][0]['score'] + + vulnerability = Vulnerability() + vulnerability.set_name(f"{vuln_id} ({package_name}): {summary}") + vulnerability.set_description(details) + vulnerability.set_identifier(vuln_id) + vulnerability.set_severity(vuln_severity) + vulnerability.set_cvss(severity) + vulnerability.set_epss(None) + vulnerability.set_category(CATEGORY) + vulnerability.set_rule("N/A") + vulnerability.set_file(vuln.get('source', {}).get('path')) + vulnerability.set_location("N/A") + vulnerability.set_fix(fix_version) + vulnerability.set_link(references[0]['url']) + vulnerability.set_tools([TOOL_NAME]) + + vulnerabilities.append(vulnerability) + + return vulnerabilities diff --git a/src/report/parsers/semgrep_parser.py b/src/report/parsers/semgrep_parser.py index 9eb5a3a..d41002e 100644 --- a/src/report/parsers/semgrep_parser.py +++ b/src/report/parsers/semgrep_parser.py @@ -5,16 +5,31 @@ def parse_semgrep_vulns(report: dict): vulnerabilities = [] - for result in report.get('results'): - name = result.get('extra').get('metadata').get('vulnerability_class')[0] + ' detected in your code' - location = "" - if result.get('end') and result.get('start').get('line'): - location = result.get('start').get('line') - elif result.get('end') and result.get('end').get('line'): - location = result.get('end').get('line') - else: - location = "Not detected" - vulnerabilities.append(Vulnerability(name, result.get('extra').get('message'),result.get('extra').get('metadata').get('cwe')[0],result.get('extra').get('metadata').get('impact'),"N/A","N/A", - CATEGORY, result.get('check_id'), result.get('path'), - location,"Not defined by the tool",result.get('extra').get('metadata').get('references')[0],[TOOL_NAME])) + for result in report.get('results',[]): + try: + vulnerability = Vulnerability() + vulnerability.set_name(result.get('extra').get('metadata').get('vulnerability_class')[0] + ' detected in your code') + vulnerability.set_location("") + if result.get('end') and result.get('start').get('line'): + vulnerability.set_location(result.get('start').get('line')) + elif result.get('end') and result.get('end').get('line'): + vulnerability.set_location(result.get('end').get('line')) + else: + vulnerability.set_location("Not detected") + vulnerability.set_description(result.get('extra').get('message')) + vulnerability.set_identifier(result.get('extra').get('metadata').get('cwe')[0]) + vulnerability.set_severity(result.get('extra').get('metadata').get('impact')) + vulnerability.set_cvss(None) + vulnerability.set_epss(None) + vulnerability.set_category(CATEGORY) + vulnerability.set_rule(result.get('check_id')) + vulnerability.set_file(result.get('path')) + vulnerability.set_fix("Not defined by the tool") + vulnerability.set_references([result.get('extra').get('metadata').get('references')[0]]) + vulnerability.set_tools([TOOL_NAME]) + + vulnerabilities.append(vulnerability) + except Exception as e: + print(f"Error parsing vulnerability: {e}") + return vulnerabilities \ No newline at end of file diff --git a/src/report/parsers/syft_parser.py b/src/report/parsers/syft_parser.py index be885e9..1a92d40 100644 --- a/src/report/parsers/syft_parser.py +++ b/src/report/parsers/syft_parser.py @@ -1,8 +1,26 @@ +from typing import List, Dict + from report.data_schema.dependency import Dependency -def parse_syft_vulns(report: dict): - dependencies = [] - for artifact in report.get('artifacts'): - dependencies.append(Dependency(artifact.get('name'), artifact.get('version'),[location.get('path') for location in artifact.get('locations')], - artifact.get('type'),artifact.get('language'),[lic.get('value') for lic in artifact.get('licenses')],artifact.get('purl'))) - return dependencies \ No newline at end of file +def parse_syft_vulns(report: Dict[str, List[Dict[str, str]]]) -> List[Dependency]: + """ + Parse the given `report` dictionary and extract information about vulnerabilities in the artifacts. + + Args: + report (Dict[str, List[Dict[str, str]]]): A dictionary containing information about vulnerabilities in the artifacts. + + Returns: + List[Dependency]: A list of Dependency objects representing the parsed vulnerabilities. + """ + dependencies: List[Dependency] = [] + for artifact in report.get('artifacts',[]): + dependency = Dependency() + dependency.set_name(artifact.get('name')) + dependency.set_version(artifact.get('version')) + dependency.set_location([location.get('path') for location in artifact.get('locations')]) + dependency.set_package_manager(artifact.get('type')) + dependency.set_language(artifact.get('language')) + dependency.set_licenses([lic.get('value') for lic in artifact.get('licenses')]) + dependency.set_purl(artifact.get('purl')) + dependencies.append(dependency) + return dependencies diff --git a/src/report/parsers/trivy_container_parser.py b/src/report/parsers/trivy_container_parser.py index e6e7045..69feea4 100644 --- a/src/report/parsers/trivy_container_parser.py +++ b/src/report/parsers/trivy_container_parser.py @@ -1,14 +1,44 @@ +from typing import List, Dict + from report.data_schema.vulnerability import Vulnerability CATEGORY = "Container analysis" TOOL_NAME = "Trivy" -def parse_trivy_container_vulns(report: dict): - vulnerabilities = [] - for source in report.get('Results'): - for vuln in source.get('Vulnerabilities'): - name = vuln.get('VulnerabilityID') + " (" + vuln.get('PkgName') + "): " + vuln.get('Title') - vulnerabilities.append(Vulnerability(name, vuln.get('Description'),vuln.get('VulnerabilityID'),vuln.get('Severity'),vuln.get('CVSS').get(list(vuln.get('CVSS').keys())[0]).get('V3Score'), "N/A", - CATEGORY, "N/A", source.get('Target'), - vuln.get('PkgName') + "@" + vuln.get('InstalledVersion'),vuln.get('FixedVersion'),vuln.get('PrimaryURL'),[TOOL_NAME])) - return vulnerabilities \ No newline at end of file +NOT_EXISTING_DICT_CVSS = { + "CVSS": { + "ANY_KEY": { + "V3Score": None + } + } +} + +def parse_trivy_container_vulns(report: Dict[str, List[Dict[str, str]]]) -> List[Vulnerability]: + """ + Parses the Trivy container vulnerabilities report and returns a list of Vulnerability objects. + + Args: + report (Dict[str, List[Dict[str, str]]]): The Trivy container vulnerabilities report. + + Returns: + List[Vulnerability]: A list of Vulnerability objects representing the vulnerabilities found in the report. + """ + vulnerabilities: List[Vulnerability] = [] + for source in report.get('Results',[]): + for vuln in source.get('Vulnerabilities',[]): + vulnerability = Vulnerability() + vulnerability.set_name(vuln.get('VulnerabilityID') + " (" + vuln.get('PkgName') + "): " + vuln.get('Title')) + vulnerability.set_description(vuln.get('Description')) + vulnerability.set_identifier(vuln.get('VulnerabilityID')) + vulnerability.set_severity(vuln.get('Severity')) + vulnerability.set_cvss((vuln.get('CVSS').get(list(vuln.get('CVSS').keys())[0]) if len(list(vuln.get('CVSS', {}).keys())) > 0 else {}).get('V3Score', None)) + vulnerability.set_epss(None) + vulnerability.set_category(CATEGORY) + vulnerability.set_rule("N/A") + vulnerability.set_file(source.get('Target')) + vulnerability.set_location("N/A") + vulnerability.set_fix(vuln.get('FixedVersion')) + vulnerability.set_link(vuln.get('PrimaryURL')) + vulnerability.set_tools([TOOL_NAME]) + vulnerabilities.append(vulnerability) + return vulnerabilities diff --git a/src/report/parsers/trivy_sca_parser.py b/src/report/parsers/trivy_sca_parser.py index 7c14f16..c54cf8c 100644 --- a/src/report/parsers/trivy_sca_parser.py +++ b/src/report/parsers/trivy_sca_parser.py @@ -3,13 +3,40 @@ CATEGORY = "Software Compose Analysis (SCA)" TOOL_NAME = "Trivy" +DICT_WITHOUT_CVSS = { + "nvd": { + "V3Score": None + } +} + def parse_trivy_sca_vulns(report: dict): + """ + Parses the Trivy SCA vulnerabilities report and extracts the relevant information into a list of Vulnerability objects. + + Args: + report (dict): The Trivy vulnerabilities report. + + Returns: + list: A list of Vulnerability objects containing the extracted information. + + """ vulnerabilities = [] - if report.get('Results'): - for source in report.get('Results'): - for vuln in source.get('Vulnerabilities'): - name = vuln.get('VulnerabilityID') + " (" + vuln.get('PkgName') + "): " + vuln.get('Title') - vulnerabilities.append(Vulnerability(name, vuln.get('Description'),vuln.get('VulnerabilityID'),vuln.get('Severity'),vuln.get('CVSS').get('nvd').get('V3Score'), "N/A", - CATEGORY, "N/A", source.get('Target'), - vuln.get('PkgName') + "@" + vuln.get('InstalledVersion'),vuln.get('FixedVersion'),vuln.get('PrimaryURL'),[TOOL_NAME])) + if report.get('Results',[]): + for source in report.get('Results',[]): + for vuln in source.get('Vulnerabilities',[]): + vulnerability = Vulnerability() + vulnerability.set_name(vuln.get('VulnerabilityID') + " (" + vuln.get('PkgName') + "): " + vuln.get('Title')) + vulnerability.set_description(vuln.get('Description')) + vulnerability.set_identifier(vuln.get('VulnerabilityID')) + vulnerability.set_severity(vuln.get('Severity')) + vulnerability.set_cvss(vuln.get('CVSS',DICT_WITHOUT_CVSS).get('nvd').get('V3Score')) + vulnerability.set_epss(None) + vulnerability.set_category(CATEGORY) + vulnerability.set_rule("N/A") + vulnerability.set_file(source.get('Target')) + vulnerability.set_location("N/A") + vulnerability.set_fix(vuln.get('FixedVersion')) + vulnerability.set_link(vuln.get('PrimaryURL')) + vulnerability.set_tools([TOOL_NAME]) + vulnerabilities.append(vulnerability) return vulnerabilities \ No newline at end of file diff --git a/src/report/process/group_vulnerabilities.py b/src/report/process/group_vulnerabilities.py index 0098f63..a6f769e 100644 --- a/src/report/process/group_vulnerabilities.py +++ b/src/report/process/group_vulnerabilities.py @@ -1,72 +1,137 @@ +from typing import List, Dict +from report.data_schema.vulnerability import Vulnerability -def group_sast_vulnerabilities(vulnerabilities): - grouped_vulnerabilities = [] +def group_sast_vulnerabilities(vulnerabilities: List[Vulnerability]) -> List[Dict[str, str]]: + """ + Group SAST vulnerabilities based on file and location. + + Args: + vulnerabilities: A list of Vulnerability objects representing individual vulnerabilities. + + Returns: + A list of dictionaries representing grouped vulnerabilities, with each dictionary containing the attributes of a Vulnerability object. + + """ + grouped_vulnerabilities: List[Vulnerability] = [] + items_to_append = [] for vuln in vulnerabilities: - if grouped_vulnerabilities != []: - for grouped_vuln in grouped_vulnerabilities: - if (grouped_vuln.file == vuln.file) and (grouped_vuln.location == vuln.location): - if vuln.tools[0] not in grouped_vuln.tools: - grouped_vuln.tools.append(vuln.tools) - grouped_vulnerabilities.append(vuln) - else: - grouped_vulnerabilities.append(vuln) + for grouped_vuln in grouped_vulnerabilities: + if grouped_vuln.file == vuln.file and grouped_vuln.location == vuln.location: + if vuln.tools[0] not in grouped_vuln.tools: + items_to_append.append((grouped_vuln, vuln.tools[0])) + break + grouped_vulnerabilities.append(vuln) + for grouped_vuln, tool in items_to_append: + grouped_vuln.tools.append(tool) return [vuln.__dict__ for vuln in grouped_vulnerabilities] - #return grouped_vulnerabilities -def group_sca_vulnerabilities(vulnerabilities): - grouped_vulnerabilities = [] + +def group_sca_vulnerabilities(vulnerabilities: List[Vulnerability]) -> List[Dict[str, str]]: + """ + Group SCA vulnerabilities based on identifier, file, and location. + + Args: + vulnerabilities: A list of Vulnerability objects representing individual vulnerabilities. + + Returns: + A list of dictionaries representing grouped vulnerabilities, with each dictionary containing the attributes of a Vulnerability object. + """ + grouped_vulnerabilities: List[Vulnerability] = [] + items_to_append: List[tuple] = [] for vuln in vulnerabilities: - if grouped_vulnerabilities != []: - for grouped_vuln in grouped_vulnerabilities: - if (grouped_vuln.identifier == vuln.identifier) and (grouped_vuln.file == vuln.file) and (grouped_vuln.location == vuln.location): - if vuln.tools[0] not in grouped_vuln.tools: - grouped_vuln.tools.append(vuln.tools) - grouped_vulnerabilities.append(vuln) - else: - grouped_vulnerabilities.append(vuln) + for grouped_vuln in grouped_vulnerabilities: + if ( + grouped_vuln.identifier == vuln.identifier + and grouped_vuln.file == vuln.file + and grouped_vuln.location == vuln.location + ): + if vuln.tools[0] not in grouped_vuln.tools: + items_to_append.append((grouped_vuln, vuln.tools[0])) + break + grouped_vulnerabilities.append(vuln) + for grouped_vuln, tool in items_to_append: + grouped_vuln.tools.append(tool) return [vuln.__dict__ for vuln in grouped_vulnerabilities] - #return grouped_vulnerabilities -def group_container_vulnerabilities(vulnerabilities): - grouped_vulnerabilities = [] + +def group_container_vulnerabilities(vulnerabilities: List[Vulnerability]) -> List[Dict[str, str]]: + """ + Group container vulnerabilities based on identifier, file, and location. + + Args: + vulnerabilities: A list of Vulnerability objects representing individual vulnerabilities. + + Returns: + A list of dictionaries representing grouped vulnerabilities, with each dictionary containing the attributes of a Vulnerability object. + """ + grouped_vulnerabilities: List[Vulnerability] = [] + items_to_append: List[tuple] = [] for vuln in vulnerabilities: - if grouped_vulnerabilities != []: - for grouped_vuln in grouped_vulnerabilities: - if (grouped_vuln.identifier == vuln.identifier) and (grouped_vuln.file == vuln.file) and (grouped_vuln.location == vuln.location): - if vuln.tools[0] not in grouped_vuln.tools: - grouped_vuln.tools.append(vuln.tools) - grouped_vulnerabilities.append(vuln) - else: - grouped_vulnerabilities.append(vuln) + for grouped_vuln in grouped_vulnerabilities: + if ( + grouped_vuln.identifier == vuln.identifier + and grouped_vuln.file == vuln.file + and grouped_vuln.location == vuln.location + ): + if vuln.tools[0] not in grouped_vuln.tools: + items_to_append.append((grouped_vuln, vuln.tools[0])) + break + grouped_vulnerabilities.append(vuln) + for grouped_vuln, tool in items_to_append: + grouped_vuln.tools.append(tool) return [vuln.__dict__ for vuln in grouped_vulnerabilities] - #return grouped_vulnerabilities -def group_iac_vulnerabilities(vulnerabilities): - grouped_vulnerabilities = [] +def group_iac_vulnerabilities(vulnerabilities: List[Vulnerability]) -> List[Dict[str, str]]: + """ + Group IAC vulnerabilities based on identifier, file, and location. + + Args: + vulnerabilities: A list of Vulnerability objects representing individual vulnerabilities. + + Returns: + A list of dictionaries representing grouped vulnerabilities, with each dictionary containing the attributes of a Vulnerability object. + """ + grouped_vulnerabilities: List[Vulnerability] = [] + items_to_append: List[tuple] = [] for vuln in vulnerabilities: - if grouped_vulnerabilities != []: - for grouped_vuln in grouped_vulnerabilities: - if (grouped_vuln.identifier == vuln.identifier) and (grouped_vuln.file == vuln.file) and (grouped_vuln.location == vuln.location): - if vuln.tools[0] not in grouped_vuln.tools: - grouped_vuln.tools.append(vuln.tools) - grouped_vulnerabilities.append(vuln) - else: - grouped_vulnerabilities.append(vuln) + for grouped_vuln in grouped_vulnerabilities: + if ( + grouped_vuln.identifier == vuln.identifier + and grouped_vuln.file == vuln.file + and grouped_vuln.location == vuln.location + ): + if vuln.tools[0] not in grouped_vuln.tools: + items_to_append.append((grouped_vuln, vuln.tools[0])) + break + grouped_vulnerabilities.append(vuln) + for grouped_vuln, tool in items_to_append: + grouped_vuln.tools.append(tool) return [vuln.__dict__ for vuln in grouped_vulnerabilities] - #return grouped_vulnerabilities -def group_secrets_vulnerabilities(vulnerabilities): - grouped_vulnerabilities = [] +def group_secrets_vulnerabilities(vulnerabilities: List[Vulnerability]) -> List[Dict[str, str]]: + """ + Group secrets vulnerabilities based on identifier, file, and location. + + Args: + vulnerabilities: A list of Vulnerability objects representing individual vulnerabilities. + + Returns: + A list of dictionaries representing grouped vulnerabilities, with each dictionary containing the attributes of a Vulnerability object. + """ + grouped_vulnerabilities: List[Vulnerability] = [] + items_to_append: List[tuple] = [] for vuln in vulnerabilities: - if grouped_vulnerabilities != []: - for grouped_vuln in grouped_vulnerabilities: - if (grouped_vuln.file == vuln.file) and (grouped_vuln.location == vuln.location): - if vuln.tools[0] not in grouped_vuln.tools: - grouped_vuln.tools.append(vuln.tools) - grouped_vulnerabilities.append(vuln) - else: - grouped_vulnerabilities.append(vuln) - return [vuln.__dict__ for vuln in grouped_vulnerabilities] - #return grouped_vulnerabilities \ No newline at end of file + for grouped_vuln in grouped_vulnerabilities: + if ( + grouped_vuln.file == vuln.file + and grouped_vuln.location == vuln.location + ): + if vuln.tools[0] not in grouped_vuln.tools: + items_to_append.append((grouped_vuln, vuln.tools[0])) + break + grouped_vulnerabilities.append(vuln) + for grouped_vuln, tool in items_to_append: + grouped_vuln.tools.append(tool) + return [vuln.__dict__ for vuln in grouped_vulnerabilities] \ No newline at end of file diff --git a/src/report/process/process_vulnerabilities.py b/src/report/process/process_vulnerabilities.py index 2c7f53f..51b2b71 100644 --- a/src/report/process/process_vulnerabilities.py +++ b/src/report/process/process_vulnerabilities.py @@ -3,6 +3,7 @@ import re import requests from cvss import CVSS3 +from typing import List, Callable, Any, Dict from ..parsers.syft_parser import parse_syft_vulns @@ -15,57 +16,76 @@ from ..parsers.bandit_parser import parse_bandit_vulns from ..parsers.findsecbugs_parser import parse_findsecbugs_vulns -def process_vulns(output_path, disable_apis): - vulnerabilities = [] - if os.path.exists(os.path.normpath(output_path + "/security-tools/semgrep/semgrep_results.json")): - with open(os.path.normpath(output_path + "/security-tools/semgrep/semgrep_results.json"), 'r') as semgrep_file: - vulnerabilities.append(parse_semgrep_vulns(json.loads(semgrep_file.read()))) - if os.path.exists(os.path.normpath(output_path + "/security-tools/trivy-sca/trivy-sca_results.json")): - with open(os.path.normpath(output_path + "/security-tools/trivy-sca/trivy-sca_results.json"), 'r') as trivy_sca_file: - vulnerabilities.append(parse_trivy_sca_vulns(json.loads(trivy_sca_file.read()))) - if os.path.exists(os.path.normpath(output_path + "/security-tools/trivy-container/trivy-container_results.json")): - with open(os.path.normpath(output_path + "/security-tools/trivy-container/trivy-container_results.json"), 'r') as trivy_container_file: - vulnerabilities.append(parse_trivy_container_vulns(json.loads(trivy_container_file.read()))) - if os.path.exists(os.path.normpath(output_path + "/security-tools/gitleaks/gitleaks_results.json")): - with open(os.path.normpath(output_path + "/security-tools/gitleaks/gitleaks_results.json"), 'r') as gitleaks_file: - vulnerabilities.append(parse_gitleaks_vulns(json.loads(gitleaks_file.read()))) - if os.path.exists(os.path.normpath(output_path + "/security-tools/checkov/results_json.json")): - with open(os.path.normpath(output_path + "/security-tools/checkov/results_json.json"), 'r') as checkov_file: - vulnerabilities.append(parse_checkov_vulns(json.loads(checkov_file.read()))) - if os.path.exists(os.path.normpath(output_path + "/security-tools/osv-scanner/osv-scanner_results.json")): - with open(os.path.normpath(output_path + "/security-tools/osv-scanner/osv-scanner_results.json"), 'r') as osv_scanner_file: - if os.stat(output_path + "/security-tools/osv-scanner/osv-scanner_results.json").st_size == 0: - osv_data = {} - else: - osv_data = json.loads(osv_scanner_file.read()) - vulnerabilities.append(parse_osv_scanner_vulns(osv_data)) - if os.path.exists(os.path.normpath(output_path + "/security-tools/bandit/bandit_results.json")): - with open(os.path.normpath(output_path + "/security-tools/bandit/bandit_results.json"), 'r') as bandit_file: - vulnerabilities.append(parse_bandit_vulns(json.loads(bandit_file.read()))) - if os.path.exists(os.path.normpath(output_path + "/security-tools/find-sec-bugs/findsecbugs_results.sarif")): - with open(os.path.normpath(output_path + "/security-tools/find-sec-bugs/findsecbugs_results.sarif"), 'r') as findsecbugs_file: - vulnerabilities.append(parse_findsecbugs_vulns(json.loads(findsecbugs_file.read()))) +def _check_and_parse(file_path: str, parse_function: Callable[[str], Any], vulnerabilities: List[Any]) -> None: + """ + Check if the file path exists and parse the file contents using the provided parse function. + Args: + file_path: The path to the file to be checked and parsed. + parse_function: The function used to parse the file contents. + vulnerabilities: A list to store the parsed vulnerabilities. + + Returns: + None + """ + if os.path.exists(file_path): + with open(file_path, 'r') as file: + vulnerabilities.append(parse_function(json.loads(file.read()))) + +def process_vulns(output_path: str, disable_apis: bool) -> List: + """ + Process the vulnerabilities from various security tools and return a list of vulnerabilities. + + Args: + output_path: The path to the output directory where the security tool results are stored. + disable_apis: A flag indicating whether to disable certain APIs. + + Returns: + A list of vulnerabilities. + """ + vulnerabilities: List = [] + + _check_and_parse(os.path.normpath(output_path + "/security-tools/semgrep/semgrep_results.json"), parse_semgrep_vulns, vulnerabilities) + _check_and_parse(os.path.normpath(output_path + "/security-tools/trivy-sca/trivy-sca_results.json"), parse_trivy_sca_vulns, vulnerabilities) + _check_and_parse(os.path.normpath(output_path + "/security-tools/trivy-container/trivy-container_results.json"), parse_trivy_container_vulns, vulnerabilities) + _check_and_parse(os.path.normpath(output_path + "/security-tools/gitleaks/gitleaks_results.json"), parse_gitleaks_vulns, vulnerabilities) + _check_and_parse(os.path.normpath(output_path + "/security-tools/checkov/results_json.json"), parse_checkov_vulns, vulnerabilities) + _check_and_parse(os.path.normpath(output_path + "/security-tools/osv-scanner/osv-scanner_results.json"), parse_osv_scanner_vulns, vulnerabilities) + _check_and_parse(os.path.normpath(output_path + "/security-tools/bandit/bandit_results.json"), parse_bandit_vulns, vulnerabilities) + _check_and_parse(os.path.normpath(output_path + "/security-tools/find-sec-bugs/findsecbugs_results.sarif"), parse_findsecbugs_vulns, vulnerabilities) + if disable_apis: pattern = re.compile("^CVE-[0-9-]+") pattern_cve_string = re.compile("^CVSS:3.*") + print('Querying vulnerabilities EPSS...') for item in [item for sublist in vulnerabilities for item in sublist]: - if item.identifier and pattern.match(item.identifier): - if pattern_cve_string.match(str(item.cvss)): - c = CVSS3(item.cvss) - item.cvss = max(list(c.scores())) + if item.get_identifier() and pattern.match(item.get_identifier()): + if pattern_cve_string.match(str(item.get_cvss())): + c = CVSS3(item.get_cvss()) + item.set_cvss(max(list(c.scores()))) idx = list(c.scores()).index(max(list(c.scores()))) - item.severity = list(c.severities())[idx].upper() - epss_response = requests.get(f"https://api.first.org/data/v1/epss?cve={item.identifier}") + item.set_severity(list(c.severities())[idx].upper()) + epss_response = requests.get(f"https://api.first.org/data/v1/epss?cve={item.get_identifier()}") if epss_response.status_code == 200: epss_object = epss_response.json() - item.epss = epss_object.get('data')[0].get('epss') + if epss_object.get('data'): + item.set_epss(epss_object.get('data')[0].get('epss')) return [item for sublist in vulnerabilities for item in sublist] -def process_deps(output_path): - dependencies = [] - if os.path.exists(os.path.normpath(output_path + "/security-tools/syft/syft_results.json")): - with open(os.path.normpath(output_path + "/security-tools/syft/syft_results.json"), 'r') as syft_file: - dependencies.append(parse_syft_vulns(json.loads(syft_file.read()))) - return [item.__dict__ for sublist in dependencies for item in sublist] \ No newline at end of file +def process_deps(output_path: str) -> List[Dict[str, str]]: + """ + Generates a list of dependencies by parsing the results of the Syft security tool. + + Args: + output_path: The path to the output directory. + + Returns: + A list of dictionaries representing the dependencies and their properties. + """ + syft_results_path = os.path.join(output_path, "security-tools", "syft", "syft_results.json") + if os.path.exists(syft_results_path): + with open(syft_results_path, 'r') as syft_file: + dependencies = parse_syft_vulns(json.load(syft_file)) + return [item.__dict__ for item in dependencies] + return [] diff --git a/src/report/report.py b/src/report/report.py index 8f6424f..a2bee07 100644 --- a/src/report/report.py +++ b/src/report/report.py @@ -1,20 +1,29 @@ import json import os +from typing import Dict, List from .process.process_vulnerabilities import process_deps, process_vulns from .process.group_vulnerabilities import group_sast_vulnerabilities, group_secrets_vulnerabilities, group_sca_vulnerabilities, group_iac_vulnerabilities, group_container_vulnerabilities +from report.data_schema.vulnerability import Vulnerability +from .data_schema.dependency import Dependency -def generate_report(output_path, disable_apis): - vulnerabilities = process_vulns(output_path, disable_apis) - dependencies = process_deps(output_path) +def generate_report(output_path: str, disable_apis: bool) -> None: + """ + Generates a report based on the specified output path and whether to disable certain APIs. - secrets_vulnerabilities = group_secrets_vulnerabilities([item for item in vulnerabilities if item.category == "Secrets detection"]) - sast_vulnerabilities = group_sast_vulnerabilities([item for item in vulnerabilities if item.category == "Static Application Security Testing (SAST)"]) - sca_vulnerabilities = group_sca_vulnerabilities([item for item in vulnerabilities if item.category == "Software Compose Analysis (SCA)"]) - containers_vulnerabilities = group_container_vulnerabilities([item for item in vulnerabilities if item.category == "Container analysis"]) - iac_vulnerabilities = group_iac_vulnerabilities([item for item in vulnerabilities if item.category == "IaC code analysis"]) + :param output_path: The path where the report will be generated. + :param disable_apis: Whether to disable certain APIs during the report generation. + """ + vulnerabilities: List[Vulnerability] = process_vulns(output_path, disable_apis) + dependencies: List[Dependency] = process_deps(output_path) - grouped_vulnerabilities = { + secrets_vulnerabilities: Dict[str, List[Vulnerability]] = group_secrets_vulnerabilities([item for item in vulnerabilities if item.category == "Secrets detection"]) + sast_vulnerabilities: Dict[str, List[Vulnerability]] = group_sast_vulnerabilities([item for item in vulnerabilities if item.category == "Static Application Security Testing (SAST)"]) + sca_vulnerabilities: Dict[str, List[Vulnerability]] = group_sca_vulnerabilities([item for item in vulnerabilities if item.category == "Software Compose Analysis (SCA)"]) + containers_vulnerabilities: Dict[str, List[Vulnerability]] = group_container_vulnerabilities([item for item in vulnerabilities if item.category == "Container analysis"]) + iac_vulnerabilities: Dict[str, List[Vulnerability]] = group_iac_vulnerabilities([item for item in vulnerabilities if item.category == "IaC code analysis"]) + + grouped_vulnerabilities: Dict[str, Dict[str, List[Vulnerability]]] = { "vulnerabilities": { "Static Application Security Testing (SAST)": sast_vulnerabilities, "Software Compose Analysis (SCA)": sca_vulnerabilities, @@ -24,7 +33,7 @@ def generate_report(output_path, disable_apis): } } - report = { + report: Dict[str, Dict[str, List[Vulnerability]]] = { "sbom": dependencies, "vulnerabilities": grouped_vulnerabilities }