|  | 
|  | 1 | +# | 
|  | 2 | +# Copyright (c) nexB Inc. and others. All rights reserved. | 
|  | 3 | +# VulnerableCode is a trademark of nexB Inc. | 
|  | 4 | +# SPDX-License-Identifier: Apache-2.0 | 
|  | 5 | +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. | 
|  | 6 | +# See https://github.com/aboutcode-org/vulnerablecode for support or download. | 
|  | 7 | +# See https://aboutcode.org for more information about nexB OSS projects. | 
|  | 8 | +# | 
|  | 9 | + | 
|  | 10 | +import json | 
|  | 11 | +import logging | 
|  | 12 | +import traceback | 
|  | 13 | +from typing import Iterable | 
|  | 14 | +from urllib.parse import urljoin | 
|  | 15 | + | 
|  | 16 | +import pytz | 
|  | 17 | +from dateutil import parser as dateparser | 
|  | 18 | +from packageurl import PackageURL | 
|  | 19 | +from univers.version_range import RANGE_CLASS_BY_SCHEMES | 
|  | 20 | +from univers.version_range import VersionRange | 
|  | 21 | +from univers.version_range import from_gitlab_native | 
|  | 22 | + | 
|  | 23 | +from vulnerabilities.importer import AdvisoryData | 
|  | 24 | +from vulnerabilities.importer import AffectedPackageV2 | 
|  | 25 | +from vulnerabilities.importer import ReferenceV2 | 
|  | 26 | +from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2 | 
|  | 27 | +from vulnerabilities.pipelines.v2_importers.gitlab_importer import get_purl | 
|  | 28 | +from vulnerabilities.utils import build_description | 
|  | 29 | +from vulnerabilities.utils import get_cwe_id | 
|  | 30 | +from vulntotal.datasources.gitlab import get_casesensitive_slug | 
|  | 31 | +from vulntotal.datasources.gitlab_api import fetch_gitlab_advisories_for_purl | 
|  | 32 | +from vulntotal.datasources.gitlab_api import get_estimated_advisories_count | 
|  | 33 | + | 
|  | 34 | + | 
|  | 35 | +class GitLabLiveImporterPipeline(VulnerableCodeBaseImporterPipelineV2): | 
|  | 36 | +    """ | 
|  | 37 | +    GitLab Live Importer Pipeline | 
|  | 38 | +
 | 
|  | 39 | +    Collect advisory from GitLab Advisory Database (Open Source Edition) for a single PURL. | 
|  | 40 | +    """ | 
|  | 41 | + | 
|  | 42 | +    pipeline_id = "gitlab_live_importer_v2" | 
|  | 43 | +    spdx_license_expression = "MIT" | 
|  | 44 | +    license_url = "https://gitlab.com/gitlab-org/advisories-community/-/blob/main/LICENSE" | 
|  | 45 | +    supported_types = ["pypi", "npm", "maven", "nuget", "composer", "conan", "gem"] | 
|  | 46 | + | 
|  | 47 | +    @classmethod | 
|  | 48 | +    def steps(cls): | 
|  | 49 | +        return ( | 
|  | 50 | +            cls.get_purl_inputs, | 
|  | 51 | +            cls.collect_and_store_advisories, | 
|  | 52 | +        ) | 
|  | 53 | + | 
|  | 54 | +    def get_purl_inputs(self): | 
|  | 55 | +        purl = self.inputs["purl"] | 
|  | 56 | +        if not purl: | 
|  | 57 | +            raise ValueError("PURL is required for GitLabLiveImporterPipeline") | 
|  | 58 | + | 
|  | 59 | +        if isinstance(purl, str): | 
|  | 60 | +            purl = PackageURL.from_string(purl) | 
|  | 61 | + | 
|  | 62 | +        if not isinstance(purl, PackageURL): | 
|  | 63 | +            raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance") | 
|  | 64 | + | 
|  | 65 | +        if purl.type not in self.supported_types: | 
|  | 66 | +            raise ValueError( | 
|  | 67 | +                f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}" | 
|  | 68 | +            ) | 
|  | 69 | + | 
|  | 70 | +        if not purl.version: | 
|  | 71 | +            raise ValueError(f"PURL: {purl!s} is expected to have a version") | 
|  | 72 | + | 
|  | 73 | +        self.purl = purl | 
|  | 74 | + | 
|  | 75 | +    purl_type_by_gitlab_scheme = { | 
|  | 76 | +        "conan": "conan", | 
|  | 77 | +        "gem": "gem", | 
|  | 78 | +        # Entering issue to parse go package names https://github.com/nexB/vulnerablecode/issues/742 | 
|  | 79 | +        # "go": "golang", | 
|  | 80 | +        "maven": "maven", | 
|  | 81 | +        "npm": "npm", | 
|  | 82 | +        "nuget": "nuget", | 
|  | 83 | +        "packagist": "composer", | 
|  | 84 | +        "pypi": "pypi", | 
|  | 85 | +    } | 
|  | 86 | + | 
|  | 87 | +    gitlab_scheme_by_purl_type = {v: k for k, v in purl_type_by_gitlab_scheme.items()} | 
|  | 88 | + | 
|  | 89 | +    def advisories_count(self): | 
|  | 90 | +        return get_estimated_advisories_count( | 
|  | 91 | +            self.purl, self.gitlab_scheme_by_purl_type, get_casesensitive_slug | 
|  | 92 | +        ) | 
|  | 93 | + | 
|  | 94 | +    def collect_advisories(self) -> Iterable[AdvisoryData]: | 
|  | 95 | +        advisories = fetch_gitlab_advisories_for_purl( | 
|  | 96 | +            self.purl, self.gitlab_scheme_by_purl_type, get_casesensitive_slug | 
|  | 97 | +        ) | 
|  | 98 | + | 
|  | 99 | +        input_version = self.purl.version | 
|  | 100 | +        vrc = RANGE_CLASS_BY_SCHEMES[self.purl.type] | 
|  | 101 | +        version_obj = vrc.version_class(input_version) if input_version else None | 
|  | 102 | + | 
|  | 103 | +        for advisory in advisories: | 
|  | 104 | +            advisory_data = self._advisory_dict_to_advisory_data(advisory) | 
|  | 105 | + | 
|  | 106 | +            affected = False | 
|  | 107 | +            for affected_package in advisory_data.affected_packages: | 
|  | 108 | +                vrange = affected_package.affected_version_range | 
|  | 109 | +                if vrange and version_obj in vrange: | 
|  | 110 | +                    affected = True | 
|  | 111 | +                    break | 
|  | 112 | +            if affected: | 
|  | 113 | +                yield advisory_data | 
|  | 114 | + | 
|  | 115 | +    def _advisory_dict_to_advisory_data(self, advisory): | 
|  | 116 | +        return advisory_dict_to_advisory_data( | 
|  | 117 | +            advisory=advisory, | 
|  | 118 | +            purl_type_by_gitlab_scheme=self.purl_type_by_gitlab_scheme, | 
|  | 119 | +            gitlab_scheme_by_purl_type=self.gitlab_scheme_by_purl_type, | 
|  | 120 | +            logger=self.log, | 
|  | 121 | +            purl=self.purl, | 
|  | 122 | +        ) | 
|  | 123 | + | 
|  | 124 | + | 
|  | 125 | +def advisory_dict_to_advisory_data( | 
|  | 126 | +    advisory: dict, | 
|  | 127 | +    purl_type_by_gitlab_scheme, | 
|  | 128 | +    gitlab_scheme_by_purl_type, | 
|  | 129 | +    logger, | 
|  | 130 | +    purl=None, | 
|  | 131 | +    advisory_url=None, | 
|  | 132 | +): | 
|  | 133 | +    """ | 
|  | 134 | +    Convert a GitLab advisory dict to AdvisoryDataV2. | 
|  | 135 | +    """ | 
|  | 136 | +    aliases = advisory.get("identifiers", []) | 
|  | 137 | +    identifier = advisory.get("identifier", "") | 
|  | 138 | +    package_slug = advisory.get("package_slug") | 
|  | 139 | + | 
|  | 140 | +    advisory_id = f"{package_slug}/{identifier}" if package_slug else identifier | 
|  | 141 | +    if advisory_id in aliases: | 
|  | 142 | +        aliases.remove(advisory_id) | 
|  | 143 | + | 
|  | 144 | +    summary = build_description(advisory.get("title"), advisory.get("description")) | 
|  | 145 | +    urls = advisory.get("urls", []) | 
|  | 146 | +    references = [ReferenceV2.from_url(u) for u in urls] | 
|  | 147 | + | 
|  | 148 | +    cwe_ids = advisory.get("cwe_ids") or [] | 
|  | 149 | +    cwe_list = list(map(get_cwe_id, cwe_ids)) | 
|  | 150 | + | 
|  | 151 | +    date_published = dateparser.parse(advisory.get("pubdate")) | 
|  | 152 | +    date_published = date_published.replace(tzinfo=pytz.UTC) | 
|  | 153 | + | 
|  | 154 | +    # Determine purl if not provided | 
|  | 155 | +    if not purl: | 
|  | 156 | +        purl = get_purl( | 
|  | 157 | +            package_slug=package_slug, | 
|  | 158 | +            purl_type_by_gitlab_scheme=purl_type_by_gitlab_scheme, | 
|  | 159 | +            logger=logger, | 
|  | 160 | +        ) | 
|  | 161 | + | 
|  | 162 | +    if not purl: | 
|  | 163 | +        logger( | 
|  | 164 | +            f"advisory_dict_to_advisory_data: purl is not valid: {package_slug!r}", | 
|  | 165 | +            level=logging.ERROR, | 
|  | 166 | +        ) | 
|  | 167 | +        return AdvisoryData( | 
|  | 168 | +            advisory_id=advisory_id, | 
|  | 169 | +            aliases=aliases, | 
|  | 170 | +            summary=summary, | 
|  | 171 | +            references_v2=references, | 
|  | 172 | +            date_published=date_published, | 
|  | 173 | +            url=advisory_url, | 
|  | 174 | +        ) | 
|  | 175 | + | 
|  | 176 | +    affected_version_range = None | 
|  | 177 | +    fixed_versions = advisory.get("fixed_versions") or [] | 
|  | 178 | +    affected_range = advisory.get("affected_range") | 
|  | 179 | +    gitlab_native_schemes = set(["pypi", "gem", "npm", "go", "packagist", "conan"]) | 
|  | 180 | +    vrc: VersionRange = RANGE_CLASS_BY_SCHEMES[purl.type] | 
|  | 181 | +    gitlab_scheme = gitlab_scheme_by_purl_type[purl.type] | 
|  | 182 | +    try: | 
|  | 183 | +        if affected_range: | 
|  | 184 | +            if gitlab_scheme in gitlab_native_schemes: | 
|  | 185 | +                affected_version_range = from_gitlab_native( | 
|  | 186 | +                    gitlab_scheme=gitlab_scheme, string=affected_range | 
|  | 187 | +                ) | 
|  | 188 | +            else: | 
|  | 189 | +                affected_version_range = vrc.from_native(affected_range) | 
|  | 190 | +    except Exception as e: | 
|  | 191 | +        logger( | 
|  | 192 | +            f"advisory_dict_to_advisory_data: affected_range is not parsable: {affected_range!r} for: {purl!s} error: {e!r}\n {traceback.format_exc()}", | 
|  | 193 | +            level=logging.ERROR, | 
|  | 194 | +        ) | 
|  | 195 | + | 
|  | 196 | +    parsed_fixed_versions = [] | 
|  | 197 | +    for fixed_version in fixed_versions: | 
|  | 198 | +        try: | 
|  | 199 | +            fixed_version = vrc.version_class(fixed_version) | 
|  | 200 | +            parsed_fixed_versions.append(fixed_version.string) | 
|  | 201 | +        except Exception as e: | 
|  | 202 | +            logger( | 
|  | 203 | +                f"advisory_dict_to_advisory_data: fixed_version is not parsable`: {fixed_version!r} error: {e!r}\n {traceback.format_exc()}", | 
|  | 204 | +                level=logging.ERROR, | 
|  | 205 | +            ) | 
|  | 206 | + | 
|  | 207 | +    if affected_version_range: | 
|  | 208 | +        vrc = affected_version_range.__class__ | 
|  | 209 | + | 
|  | 210 | +    fixed_version_range = vrc.from_versions(parsed_fixed_versions) | 
|  | 211 | +    if not fixed_version_range and not affected_version_range: | 
|  | 212 | +        return | 
|  | 213 | + | 
|  | 214 | +    purl_without_version = get_purl( | 
|  | 215 | +        package_slug=package_slug, | 
|  | 216 | +        purl_type_by_gitlab_scheme=purl_type_by_gitlab_scheme, | 
|  | 217 | +        logger=logger, | 
|  | 218 | +    ) | 
|  | 219 | + | 
|  | 220 | +    affected_package = AffectedPackageV2( | 
|  | 221 | +        package=purl_without_version, | 
|  | 222 | +        affected_version_range=affected_version_range, | 
|  | 223 | +        fixed_version_range=fixed_version_range, | 
|  | 224 | +    ) | 
|  | 225 | + | 
|  | 226 | +    if not advisory_url and package_slug and identifier: | 
|  | 227 | +        advisory_url = urljoin( | 
|  | 228 | +            "https://gitlab.com/gitlab-org/advisories-community/-/blob/main/", | 
|  | 229 | +            package_slug + "/" + identifier + ".yml", | 
|  | 230 | +        ) | 
|  | 231 | + | 
|  | 232 | +    return AdvisoryData( | 
|  | 233 | +        advisory_id=advisory_id, | 
|  | 234 | +        aliases=aliases, | 
|  | 235 | +        summary=summary, | 
|  | 236 | +        references_v2=references, | 
|  | 237 | +        date_published=date_published, | 
|  | 238 | +        affected_packages=[affected_package], | 
|  | 239 | +        weaknesses=cwe_list, | 
|  | 240 | +        url=advisory_url, | 
|  | 241 | +        original_advisory_text=json.dumps(advisory, indent=2, ensure_ascii=False), | 
|  | 242 | +    ) | 
0 commit comments