From 8b6cf2a50902c3354892ce54cd6558d18f8343a4 Mon Sep 17 00:00:00 2001 From: Jamie Strandboge Date: Fri, 18 Aug 2023 10:53:25 -0500 Subject: [PATCH 1/6] chore: rename formatWhereFromNamespace as formatWhereFromOCIType --- cvelib/scan.py | 14 +++++++++----- tests/test_scan.py | 8 +++++--- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/cvelib/scan.py b/cvelib/scan.py index 7c763bc..3571ab2 100644 --- a/cvelib/scan.py +++ b/cvelib/scan.py @@ -364,7 +364,7 @@ def getScanOCIsReportUnused( return s.rstrip() -def formatWhereFromNamespace( +def formatWhereFromOCIType( oci_type: str, namespace: str, where_override: str = "" ) -> str: where: str = "unknown" @@ -380,6 +380,10 @@ def formatWhereFromNamespace( if where_override == "": w = namespace where = "quay-%s" % w + elif oci_type == "dso": + where = "dockerhub" + if where_override != "": + where += "-%s" % where_override elif where_override != "": where = where_override @@ -409,15 +413,15 @@ def _parseScanURL(url: str, where_override: str = "") -> Tuple[str, str, str, st if pat.search(url): # https://us-docker.pkg.dev/PROJECT/REPO/IMGNAME@sha256:... namespace = "%s/%s" % (tmp[3], tmp[2].rsplit("-", maxsplit=1)[0]) - where = formatWhereFromNamespace("gar", namespace, where_override) + where = formatWhereFromOCIType("gar", namespace, where_override) software = tmp[4] modifier = tmp[5] elif url.startswith("https://quay.io/repository/"): # quay.io # https://quay.io/repository/ORG/IMGNAME/manifest/sha256:... - where = formatWhereFromNamespace("quay", tmp[4], where_override) + where = formatWhereFromOCIType("quay", tmp[4], where_override) software = tmp[5] else: - where = formatWhereFromNamespace("", "", where_override) + where = formatWhereFromOCIType("", "", where_override) software = "TBD" return (product, where, software, modifier) @@ -431,7 +435,7 @@ def parseNsAndImageToPkg( return ("", "", "", "") product: str = "oci" - where: str = formatWhereFromNamespace(oci_type, namespace, where_override) + where: str = formatWhereFromOCIType(oci_type, namespace, where_override) software: str = img.split("@")[0].split("/")[0] modifier: str = "" if oci_type == "gar": diff --git a/tests/test_scan.py b/tests/test_scan.py index 537f72a..b9759b2 100644 --- a/tests/test_scan.py +++ b/tests/test_scan.py @@ -600,8 +600,8 @@ def test_getScanOCIsReportUnused(self): res = cvelib.scan.getScanOCIsReportUnused(ocis, fixable=fixable) self.assertEqual(exp, res) - def test_formatWhereFromNamespace(self): - """Test formatWhereFromNamespace()""" + def test_formatWhereFromOCIType(self): + """Test formatWhereFromFromOCIType()""" tsts = [ # oci_type, namespace, where_override, exp ("", "", "", "unknown"), @@ -614,12 +614,14 @@ def test_formatWhereFromNamespace(self): ("gar", "proj/us", "ovr", "gar-ovr"), ("quay", "org", "", "quay-org"), ("quay", "org", "ovr", "quay-ovr"), + ("dso", "", "", "dockerhub"), + ("dso", "", "ovr", "dockerhub-ovr"), ("other", "b@d", "", "unknown"), ("other", "", "b@d", "unknown"), ] for oci_type, ns, whr, exp in tsts: - res = cvelib.scan.formatWhereFromNamespace(oci_type, ns, whr) + res = cvelib.scan.formatWhereFromOCIType(oci_type, ns, whr) self.assertEqual(exp, res) def test__parseScanURL(self): From bc1106d33a943796458d1c21691a1eb2c5d30401 Mon Sep 17 00:00:00 2001 From: Jamie Strandboge Date: Fri, 18 Aug 2023 12:14:01 -0500 Subject: [PATCH 2/6] chore: update parseNsAndImage* to not consider ns with dso --- cvelib/scan.py | 7 ++++--- tests/test_scan.py | 10 ++++++---- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/cvelib/scan.py b/cvelib/scan.py index 3571ab2..a7b74e7 100644 --- a/cvelib/scan.py +++ b/cvelib/scan.py @@ -431,7 +431,7 @@ def parseNsAndImageToPkg( oci_type: str, namespace: str, img: str, where_override: str = "" ) -> Tuple[str, str, str, str]: """Find CVE 'product', 'where', 'software' and 'modifier' from namespace and image name""" - if namespace == "" or img == "": + if (namespace == "" and oci_type != "dso") or img == "": return ("", "", "", "") product: str = "oci" @@ -449,7 +449,7 @@ def parseNsAndImageToURLPattern( oci_type: str, namespace: str, img: str, where_override: str = "" ) -> Union[None, Pattern]: """Find ScanOCI 'url' from namespace and image""" - if namespace == "" or img == "": + if (namespace == "" and oci_type != "dso") or img == "": return None pat: Union[None, Pattern] = None @@ -482,7 +482,8 @@ def parseNsAndImageToURLPattern( elif oci_type == "dso": # dso has no concept of org pat = re.compile( - "^https://dso.docker.com/images/%s/digests/sha256:" % (namespace) + "^https://dso.docker.com/images/%s/digests/sha256:" + % (img.split("@", maxsplit=1)[0]) ) return pat diff --git a/tests/test_scan.py b/tests/test_scan.py index b9759b2..0ab6759 100644 --- a/tests/test_scan.py +++ b/tests/test_scan.py @@ -693,6 +693,8 @@ def test_parseNsAndImageToPkg(self): ("gar", "foo/loc", "bar/baz", "ovr", "oci", "gar-ovr", "bar", "baz"), ("quay", "foo", "bar", "", "oci", "quay-foo", "bar", ""), ("quay", "foo", "bar", "ovr", "oci", "quay-ovr", "bar", ""), + ("dso", "", "bar", "", "oci", "dockerhub", "bar", ""), + ("dso", "", "bar", "ovr", "oci", "dockerhub-ovr", "bar", ""), ] for oci_type, ns, img, whr, expP, expW, expS, expM in tsts: @@ -814,32 +816,32 @@ def test_parseNsAndImageToURLPattern(self): ), ( "dso", - "foo", "ignored", + "foo", "", "https://dso.docker.com/images/foo/digests/sha256:deadbeef", True, ), ( "dso", - "other", "ignored", + "other", "", "https://dso.docker.com/images/foo/digests/sha256:deadbeef", False, ), ( "dso", - "foo", "ignored", + "foo", "ovr", "https://dso.docker.com/images/foo/digests/sha256:deadbeef", True, ), ( "dso", - "other", "ignored", + "other", "ovr", "https://dso.docker.com/images/foo/digests/sha256:deadbeef", False, From e1ba91ab12b8a62634422ffa187ab16280b4258f Mon Sep 17 00:00:00 2001 From: Jamie Strandboge Date: Fri, 18 Aug 2023 14:36:51 -0500 Subject: [PATCH 3/6] chore: update _parseScanURL() to support dso --- cvelib/scan.py | 3 +++ tests/test_scan.py | 16 ++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/cvelib/scan.py b/cvelib/scan.py index a7b74e7..ac29edb 100644 --- a/cvelib/scan.py +++ b/cvelib/scan.py @@ -420,6 +420,9 @@ def _parseScanURL(url: str, where_override: str = "") -> Tuple[str, str, str, st # https://quay.io/repository/ORG/IMGNAME/manifest/sha256:... where = formatWhereFromOCIType("quay", tmp[4], where_override) software = tmp[5] + elif url.startswith("https://dso.docker.com/"): + where = formatWhereFromOCIType("dso", "", where_override) + software = tmp[4] else: where = formatWhereFromOCIType("", "", where_override) software = "TBD" diff --git a/tests/test_scan.py b/tests/test_scan.py index 0ab6759..2fe4e4e 100644 --- a/tests/test_scan.py +++ b/tests/test_scan.py @@ -672,6 +672,22 @@ def test__parseScanURL(self): "IMGNAME", "", ), + ( + "https://dso.docker.com/images/IMGNAME/digests/sha256:deadbeef", + "", + "oci", + "dockerhub", + "IMGNAME", + "", + ), + ( + "https://dso.docker.com/images/IMGNAME/digests/sha256:deadbeef", + "override", + "oci", + "dockerhub-override", + "IMGNAME", + "", + ), ] for url, whr, expP, expW, expS, expM in tsts: From 50080f800df4d793dd510808a06e537dde630d76 Mon Sep 17 00:00:00 2001 From: Jamie Strandboge Date: Fri, 18 Aug 2023 10:35:06 -0500 Subject: [PATCH 4/6] fix: --list and --namespace not supported with dso --- README.md | 5 +- cvelib/dso.py | 196 +++++++++++++++++++++++-------------------- cvelib/report.py | 121 +++++++++++++++----------- tests/test_dso.py | 171 ++++++++++++++++++++----------------- tests/test_report.py | 85 +++---------------- 5 files changed, 283 insertions(+), 295 deletions(-) diff --git a/README.md b/README.md index 8fdea24..3b4f00a 100644 --- a/README.md +++ b/README.md @@ -118,9 +118,8 @@ $ cve-report quay --alerts --name /@ # Docker DSO container security reports - $ cve-report dso --list - $ cve-report dso --list-digest / - $ cve-report dso --alerts --name /@ + $ cve-report dso --list-digest [:|@] + $ cve-report dso --alerts --name [:|@] # if desired, leave the venv $ deactivate diff --git a/cvelib/dso.py b/cvelib/dso.py index ddd13d9..ee0f2e9 100644 --- a/cvelib/dso.py +++ b/cvelib/dso.py @@ -167,26 +167,37 @@ class DockerDSOSecurityReportNew(SecurityReportInterface): # } def getDigestForImage(self, repo_full: str) -> str: """Obtain the digest for the the specified repo""" - if "/" not in repo_full: - error("Please use REPO/TAG", do_exit=False) - return "" - - ns: str = "" - name: str = "" - ns, name = repo_full.split("/", 2) + name = repo_full + sha256: str = "" + tagsearch: str = "" + if "@sha256:" in name: + name, sha256 = name.split("@", 2) + elif ":" in name: + name, tagsearch = name.split(":", 2) - rese = _getListEDN(ns) + rese = _getListEDN(name) if len(rese) < 1: # error condition from _getListEDN() return "" digest: str = "" + latest_d: Union[None, datetime] = None for img in rese["docker-repository-tags"]["data"]: if "image" in img and "docker.image/tags" in img["image"]: - if name in img["image"]["docker.image/tags"]: - digest = img["image"]["docker.image/digest"] + if sha256 != "" and sha256 == img["image"]["docker.image/digest"]: + digest = sha256 + break + elif ( + tagsearch != "" and tagsearch in img["image"]["docker.image/tags"] + ) or tagsearch == "": + if ( + latest_d is None + or img["image"]["docker.image/created-at"] > latest_d + ): + digest = img["image"]["docker.image/digest"] + latest_d = img["image"]["docker.image/created-at"] if digest != "": - return "%s/%s@%s" % (ns, name, digest) + return "%s@%s" % (name, digest) return "" @@ -199,82 +210,15 @@ def parseImageDigest(self, digest: str) -> Tuple[str, str, str]: error("Malformed digest '%s' (should have 1 '@')" % digest) return ("", "", "") - tmp: str = "" sha256: str = "" - tmp, sha256 = digest.split("@") + repo, sha256 = digest.split("@") - if tmp.count("/") != 1: - error("Malformed digest '%s' (should have 1 '/')" % digest) - return ("", "", "") + return ("", repo, sha256) - ns: str = "" - repo: str = "" - ns, repo = tmp.split("/") - return (ns, repo, sha256) - - # $ curl -X POST https://api.dso.docker.com/datalog/shared-vulnerability/queries - # --data-binary '_getListEDN()' - # { - # "docker-repository-tags": { - # "data": [ - # { - # "image": { - # "docker.image/digest": "sha256:af27abadb0a5e58b01e58806a02aca8c46d4c2b0823d6077d13de7ade017e9a9", - # "docker.image/created-at": "2022-12-16 00:23:40+00:00", - # "docker.image/tags": [ - # "1.0-foo", - # "1-foo", - # "foo" - # ] - # } - # } - # ], - # "basis-t": "12345678", - # "tx": "12345678901234" - # }, - # "extensions": { - # "x-atomist-correlation-id": "81e2aee7-13d1-4097-93aa-90841e5bd43b" - # } - # } - def getOCIsForNamespace(self, namespace: str) -> List[Tuple[str, int]]: + def getOCIsForNamespace(self, _: str) -> List[Tuple[str, int]]: # pragma: nocover """Obtain the list of DockerDSO repos for the specified namespace""" - if sys.stdout.isatty(): - print("Fetching list of repos:", end="", flush=True) - - rese = _getListEDN(namespace) - if len(rese) < 1: # error condition from _getListEDN() - return [] - - # gather all the tags and add the ones with the latest date - repos: List[Tuple[str, int]] = [] - tmp: Dict[str, int] = {} - for img in rese["docker-repository-tags"]["data"]: - if "image" in img and "docker.image/tags" in img["image"]: - name: str = "" - # For now, take the longest tag, assuming it is the most accurate - # name (eg, 8 vs 8.1 vs 8.1.2). This may need to be adjusted... - for tagname in img["image"]["docker.image/tags"]: - if len(tagname) > len(name): - name = tagname - - m: int = 0 - if ( - "docker.image/created-at" in img["image"] - and img["image"]["docker.image/created-at"] is not None - ): - # convert to expected format (epoch) - m = int(img["image"]["docker.image/created-at"].strftime("%s")) - - if name not in tmp or m > tmp[name]: - tmp[name] = m - - for name in tmp: - repos.append((name, tmp[name])) - - if sys.stdout.isatty(): - print(" done!") - - return copy.deepcopy(repos) + # dso doesn't have a concept of namespaces + raise NotImplementedError def fetchScanReport( self, @@ -285,8 +229,8 @@ def fetchScanReport( priorities: List[str] = [], ) -> Tuple[List[ScanOCI], str]: """Obtain the security manifest for the specified repo@sha256:...""" - if "/" not in repo_full or "@sha256:" not in repo_full: - error("Please use REPO/TAG@sha256:", do_exit=False) + if "@sha256:" not in repo_full: + error("Please use REPO@sha256:SHA256", do_exit=False) return [], "" purls: Dict[str, List[str]] = _fetchPackageURLs(repo_full.split("@")[-1]) @@ -306,7 +250,7 @@ def fetchScanReport( return [], "" url: str = "https://dso.docker.com/images/%s/digests/%s" % ( - repo_full.split("/")[0], + repo_full.split("@")[0], repo_full.split("@")[1], ) @@ -784,6 +728,75 @@ def _getListEDN(namespace: str, days: int = 365) -> Dict: return copy.deepcopy(rese) +# $ curl -X POST https://api.dso.docker.com/datalog/shared-vulnerability/queries +# --data-binary '_getListEDN()' +# { +# "docker-repository-tags": { +# "data": [ +# { +# "image": { +# "docker.image/digest": "sha256:af27abadb0a5e58b01e58806a02aca8c46d4c2b0823d6077d13de7ade017e9a9", +# "docker.image/created-at": "2022-12-16 00:23:40+00:00", +# "docker.image/tags": [ +# "1.0-foo", +# "1-foo", +# "foo" +# ] +# } +# } +# ], +# "basis-t": "12345678", +# "tx": "12345678901234" +# }, +# "extensions": { +# "x-atomist-correlation-id": "81e2aee7-13d1-4097-93aa-90841e5bd43b" +# } +# } +def _getOCIsForRepo(repo_name: str) -> List[Tuple[str, int]]: + """Obtain the list of DockerDSO tags for the specified repo""" + if ":" in repo_name or "@" in repo_name or "/" in repo_name: + error("Please use REPO (without :TAG or @sha256:SHA256)") + return [] # for tests + + if sys.stdout.isatty(): + print("Fetching list of repos:", end="", flush=True) + + rese = _getListEDN(repo_name) + if len(rese) < 1: # error condition from _getListEDN() + return [] + + # gather all the tags and add the ones with the latest date + repos: List[Tuple[str, int]] = [] + tmp: Dict[str, int] = {} + for img in rese["docker-repository-tags"]["data"]: + if "image" in img and "docker.image/tags" in img["image"]: + name: str = "" + # For now, take the longest tag, assuming it is the most accurate + # name (eg, 8 vs 8.1 vs 8.1.2). This may need to be adjusted... + for tagname in img["image"]["docker.image/tags"]: + if len(tagname) > len(name): + name = tagname + + m: int = 0 + if ( + "docker.image/created-at" in img["image"] + and img["image"]["docker.image/created-at"] is not None + ): + # convert to expected format (epoch) + m = int(img["image"]["docker.image/created-at"].strftime("%s")) + + if name not in tmp or m > tmp[name]: + tmp[name] = m + + for name in tmp: + repos.append((name, tmp[name])) + + if sys.stdout.isatty(): + print(" done!") + + return copy.deepcopy(repos) + + # # CLI mains # @@ -830,7 +843,7 @@ def main_dso_dump_reports(): sr = DockerDSOSecurityReportNew() # Find latest digest for all images - oci_names: List[Tuple[str, int]] = sr.getOCIsForNamespace(args.name) + oci_names: List[Tuple[str, int]] = _getOCIsForRepo(args.name) if len(oci_names) == 0: error("Could not enumerate any OCI image names") return # for tests @@ -842,12 +855,12 @@ def main_dso_dump_reports(): if sys.stdout.isatty(): # pragma: nocover print(".", end="", flush=True) - name: str = "%s/%s" % (args.name, oci.split("/", maxsplit=5)[-1]) + name: str = "%s:%s" % (args.name, oci) digest: str = sr.getDigestForImage(name) if digest == "": warn("Could not find digest for %s" % name) continue - ocis.append("%s@%s" % (name, digest.split("@")[1])) + ocis.append("%s@%s" % (args.name, digest.split("@")[1])) if sys.stdout.isatty(): # pragma: nocover print(" done!", flush=True) @@ -864,7 +877,7 @@ def main_dso_dump_reports(): # gather a list of potentially matching filenames json_files: Dict[str, str] = {} for root, _, files in os.walk(args.path): - if "/dso/%s/" % args.name not in root: # quick prune + if not root.endswith("/dso/%s" % args.name): # quick prune continue for f in files: if f.endswith(".json"): @@ -900,7 +913,7 @@ def main_dso_dump_reports(): if len(j) == 0: continue - repo_name: str = full_name.split("@")[0].split("/")[-1] + repo_name: str = full_name.split("@")[0] sha256: str = full_name.split("@")[1].split(":")[-1] if sha256 not in json_files: # create under dir with today's date @@ -911,7 +924,6 @@ def main_dso_dump_reports(): "%0.2d" % dobj.month, "%0.2d" % dobj.day, "dso", - args.name, repo_name, ]: dir = os.path.join(dir, subdir) diff --git a/cvelib/report.py b/cvelib/report.py index 73c99c7..e910f11 100644 --- a/cvelib/report.py +++ b/cvelib/report.py @@ -1076,8 +1076,10 @@ def getOCIReports( continue if "@sha256:" not in img: # search for tag or latest - digest: str = sr.getDigestForImage("%s/%s" % (namespace, img)) - if digest == "" or "/" not in digest: + digest: str = sr.getDigestForImage( + "%s/%s" % (namespace, img) if namespace != "" else img + ) + if digest == "" or "@" not in digest: warn("Could not find digest for %s" % img) continue _, repo, sha256 = sr.parseImageDigest(digest) @@ -1085,7 +1087,7 @@ def getOCIReports( if raw: _, json = sr.fetchScanReport( - "%s/%s" % (namespace, img), + "%s/%s" % (namespace, img) if namespace != "" else img, raw=raw, fixable=fixable, priorities=priorities, @@ -1093,7 +1095,7 @@ def getOCIReports( scan_raws[img] = json else: scan_ocis[img], msg = sr.fetchScanReport( - "%s/%s" % (namespace, img), + "%s/%s" % (namespace, img) if namespace != "" else img, raw=raw, fixable=fixable, priorities=priorities, @@ -1123,7 +1125,7 @@ def getOCIReports( pat: Pattern = re.compile(r"sha256:.*") for img in scan_ocis: - repo_full: str = "%s/%s" % (namespace, img) + repo_full: str = "%s/%s" % (namespace, img) if namespace != "" else img pat_url = cvelib.scan.parseNsAndImageToURLPattern( sr.name, namespace, img, where_override=oci_where_override ) @@ -2069,27 +2071,27 @@ def _main_report_parse_args(sysargs: Sequence[str]) -> argparse.Namespace: # OCI reports - # Show list of OCI image names - $ cve-report gar --namespace / --list # eg, foo/us - $ cve-report quay --namespace --list - $ cve-report dso --namespace --list - - # Show list of GAR repositories - $ cve-report gar --namespace --list-repos - # Show latest SHA256 digest with a scan result for image name + $ cve-report dso --list-digest $ cve-report gar --namespace / --list-digest / $ cve-report quay --namespace --list-digest # Show SHA256 digest for image name with tag + $ cve-report dso --list-digest : $ cve-report gar --namespace / --list-digest /: $ cve-report quay --namespace --list-digest : - $ cve-report dso --namespace --list-digest # Show security report for image name with digest + $ cve-report dso --alerts --images @ $ cve-report gar --alerts --namespace / --images /@ $ cve-report quay --alerts --namespace --images @ - $ cve-report dso --alerts --namespace --images @ + + # Show list of OCI image names for an organizational group (gar and quay) + $ cve-report gar --namespace / --list # eg, foo/us + $ cve-report quay --namespace --list + + # Show list of GAR repositories + $ cve-report gar --namespace --list-repos # Eg, to research the 'foo' project with location 'us' in GAR: # - find all the container images @@ -2138,8 +2140,9 @@ def _main_report_parse_args(sysargs: Sequence[str]) -> argparse.Namespace: * other/eu norf/corge:1.2.3-alpine cve-report abstracts this to use 'namespace' to denote the organizational - grouping so that invocation is the same across each registry. Eg: - * Docker HUB: use --namespace and --images ,... + grouping so that invocation is the same across each registry. Since Docker + Hub does not support organizational groups, it should be omitted. Eg: + * Docker HUB: use --images ,... * GAR: use --namespace / and --images /,... * Quay.io: use --namespace and --images ,... """ @@ -2200,13 +2203,7 @@ def _add_issues_filter(p): action="store_true", ) - def _add_common_oci(p, what: str, where: str, imgname: str): - p.add_argument( - "--list", - dest="list", - help="list %s OCI image names" % what, - action="store_true", - ) + def _add_common_oci(p, what: str, imgname: str): bare_imgname: str = imgname.split("@")[0] p.add_argument( "--list-digest", @@ -2248,14 +2245,6 @@ def _add_common_oci(p, what: str, where: str, imgname: str): help="also show unfixable items in %s security reports" % what, action="store_true", ) - p.add_argument( - "--namespace", - dest="namespace", - help="%s namespace (eg %s)" % (what, where), - metavar=where, - type=str, - default=None, - ) p.add_argument( "--images", dest="images", @@ -2416,20 +2405,47 @@ def _add_common_oci(p, what: str, where: str, imgname: str): # quay parser_quay = sub.add_parser("quay") - _add_common_oci(parser_quay, "quay.io", "ORG", "NAME@sha256:SHA256") + parser_quay.add_argument( + "--namespace", + dest="namespace", + help="quay.io namespace (eg ORG)", + metavar="ORG", + type=str, + default=None, + ) + _add_common_oci(parser_quay, "quay.io", "NAME@sha256:SHA256") + parser_quay.add_argument( + "--list", + dest="list", + help="list quay.io OCI image names", + action="store_true", + ) # dso parser_dso = sub.add_parser("dso") - _add_common_oci(parser_dso, "dso.docker.com", "REPO", "NAME@sha256:SHA256") + _add_common_oci(parser_dso, "dso.docker.com", "NAME@sha256:SHA256") # gar parser_gar = sub.add_parser("gar") + parser_gar.add_argument( + "--namespace", + dest="namespace", + help="GAR namespace (eg PROJECT/LOCATION)", + metavar="PROJECT/LOCATION", + type=str, + default=None, + ) _add_common_oci( parser_gar, "GAR", - "PROJECT/LOCATION", "REPO/NAME@sha256:SHA256", ) + parser_gar.add_argument( + "--list", + dest="list", + help="list GAR OCI image names", + action="store_true", + ) parser_gar.add_argument( "--list-repos", dest="list_repos", @@ -2510,7 +2526,7 @@ def _add_common_oci(p, what: str, where: str, imgname: str): "Please specify one of --alerts, --list, --list-repos or --list-digest with 'gar'" ) elif ( - args.cmd in ["dso", "quay"] + args.cmd == "quay" and not args.alerts and not args.list and not args.list_digest @@ -2519,9 +2535,11 @@ def _add_common_oci(p, what: str, where: str, imgname: str): "Please specify one of --alerts, --list or --list-digest with '%s'" % args.cmd ) - elif not args.namespace: + elif args.cmd == "dso" and not args.alerts and not args.list_digest: + error("Please specify --alerts or --list-digest with '%s'" % args.cmd) + elif args.cmd in ["gar", "quay"] and not args.namespace: error("Please specify --namespace with '%s'" % args.cmd) - elif args.cmd in ["dso", "quay"] and args.namespace.count("/") > 0: + elif args.cmd == "quay" and args.namespace.count("/") > 0: error("--namespace '%s' should not contain '/'" % args.namespace) elif args.cmd == "gar" and args.namespace.count("/") != 1: error("--namespace '%s' should contain one '/'" % args.namespace) @@ -2537,7 +2555,7 @@ def _add_common_oci(p, what: str, where: str, imgname: str): # below here are --alerts specific elif args.raw and (args.with_templates or args.all): error("--raw not supported with --all or --with-templates") - elif args.list: + elif args.cmd in ["gar", "quay"] and args.list: error("Unsupported option --list with --alerts") elif args.list_digest: error("Unsupported option --list-digest with --alerts") @@ -2729,7 +2747,7 @@ def main_report(sysargs: Optional[Sequence[str]] = None): else: sr = cvelib.gar.GARSecurityReportNew() - if args.list: + if args.cmd in ["gar", "quay"] and args.list: def formatDate(t: int) -> str: s: str = "unknown" @@ -2742,10 +2760,6 @@ def formatDate(t: int) -> str: for (r, m) in sorted(ocis): # ORG/NAME print("%s/%s %s" % (args.namespace, r, formatDate(m))) - elif args.cmd == "dso": - for (r, m) in sorted(ocis): - # REPO/NAME - print("%s/%s %s" % (args.namespace, r, formatDate(m))) elif args.cmd == "gar": for (r, m) in sorted(ocis): # PROJECT/LOCATION/REPO/NAME @@ -2759,16 +2773,23 @@ def formatDate(t: int) -> str: # PROJECT/LOCATION/REPO print("%s/%s" % (args.namespace, r.split("/")[-1])) elif args.list_digest: - digest: str = sr.getDigestForImage( - "%s/%s" % (args.namespace, args.list_digest) - ) + digest: str + if args.cmd == "dso": + digest = sr.getDigestForImage(args.list_digest) + else: + digest = sr.getDigestForImage( + "%s/%s" % (args.namespace, args.list_digest) + ) if digest == "": # pragma: nocover sys.exit(1) print(digest.split("@")[1]) elif args.alerts: - filter_product: str = "oci/%s" % cvelib.scan.formatWhereFromNamespace( - args.cmd, args.namespace, oci_where_override + ns = "" + if args.cmd in ["gar", "quay"]: + ns = args.namespace + filter_product: str = "oci/%s" % cvelib.scan.formatWhereFromOCIType( + args.cmd, ns, oci_where_override ) cves = collectCVEData( cveDirs, @@ -2802,7 +2823,7 @@ def formatDate(t: int) -> str: getOCIReports( cves, sr=sr, - namespace=args.namespace, + namespace=ns, images=images, excluded_images=excluded_images, with_templates=args.with_templates, diff --git a/tests/test_dso.py b/tests/test_dso.py index 303d404..6a8d055 100644 --- a/tests/test_dso.py +++ b/tests/test_dso.py @@ -376,8 +376,8 @@ def test__getListEDN(self, mock_post, mock_ednLoadAsDict): # Note, these are listed in reverse order ot the arguments to test_... @mock.patch("cvelib.dso.ednLoadAsDict") @mock.patch("requests.post") - def test_getOCIsForNamespace(self, mock_post, mock_ednLoadAsDict): - """Test getOCIsForNamespace()""" + def test__getOCIsForRepo(self, mock_post, mock_ednLoadAsDict): + """Test _getOCIsForRepo()""" mock_post.return_value = self._mock_response_for_dso(content="edn-doc") mock_ednLoadAsDict.return_value = { "docker-repository-tags": { @@ -389,9 +389,9 @@ def test_getOCIsForNamespace(self, mock_post, mock_ednLoadAsDict): 2023, 8, 7, 6, 5, 4, tzinfo=datetime.timezone.utc ), "docker.image/tags": [ - "1.0-valid-tag", - "1-valid-tag", - "valid-tag", + "1.0-valid-name", + "1-valid-name", + "valid-name", ], } } @@ -403,10 +403,9 @@ def test_getOCIsForNamespace(self, mock_post, mock_ednLoadAsDict): "x-atomist-correlation-id": "81e2aee7-13d1-4097-93aa-90841e5bd43b" }, } - dsr = cvelib.dso.DockerDSOSecurityReportNew() - res = dsr.getOCIsForNamespace("valid-repo") + res = cvelib.dso._getOCIsForRepo("valid-repo") self.assertEqual(1, len(res)) - self.assertEqual("1.0-valid-tag", res[0][0]) + self.assertEqual("1.0-valid-name", res[0][0]) # should be able to do #self.assertEqual(1691503749, res[0][1]) but # circleci's python gives different epoch (weird). Confirm the datetime @@ -429,9 +428,9 @@ def test_getOCIsForNamespace(self, mock_post, mock_ednLoadAsDict): "docker.image/digest": "sha256:af27abadb0a5e58b01e58806a02aca8c46d4c2b0823d6077d13de7ade017e9a9", "docker.image/created-at": None, "docker.image/tags": [ - "1.0-valid-tag", - "1-valid-tag", - "valid-tag", + "1.0-valid-name", + "1-valid-name", + "valid-name", ], } } @@ -443,17 +442,15 @@ def test_getOCIsForNamespace(self, mock_post, mock_ednLoadAsDict): "x-atomist-correlation-id": "81e2aee7-13d1-4097-93aa-90841e5bd43b" }, } - dsr = cvelib.dso.DockerDSOSecurityReportNew() - res = dsr.getOCIsForNamespace("valid-repo") + res = cvelib.dso._getOCIsForRepo("valid-repo") self.assertEqual(1, len(res)) self.assertEqual(0, res[0][1]) # empty mock_post.return_value = self._mock_response_for_dso(content="edn-doc") mock_ednLoadAsDict.return_value = {} - dsr = cvelib.dso.DockerDSOSecurityReportNew() with tests.testutil.capturedOutput() as (output, error): - res = dsr.getOCIsForNamespace("valid-repo") + res = cvelib.dso._getOCIsForRepo("valid-repo") self.assertEqual("", output.getvalue().strip()) self.assertTrue( "Could not find 'docker-repository-tags' as dict in response" @@ -473,15 +470,32 @@ def test_getOCIsForNamespace(self, mock_post, mock_ednLoadAsDict): "x-atomist-correlation-id": "81e2aee7-13d1-4097-93aa-90841e5bd43b" }, } - dsr = cvelib.dso.DockerDSOSecurityReportNew() with tests.testutil.capturedOutput() as (output, error): - res = dsr.getOCIsForNamespace("valid-repo") + res = cvelib.dso._getOCIsForRepo("valid-repo") self.assertEqual("", output.getvalue().strip()) self.assertTrue( "Could not find 'image' in response for image" in error.getvalue().strip() ) self.assertEqual(0, len(res)) + # bad invocation + with mock.patch.object( + cvelib.common.error, + "__defaults__", + ( + 1, + False, + ), + ): + with tests.testutil.capturedOutput() as (output, error): + res = cvelib.dso._getOCIsForRepo("valid-repo:dont-use-tag") + self.assertEqual("", output.getvalue().strip()) + self.assertTrue( + "Please use REPO (without :TAG or @sha256:SHA256)" + in error.getvalue().strip() + ) + self.assertEqual(0, len(res)) + # Note, these are listed in reverse order ot the arguments to test_... @mock.patch("cvelib.dso.ednLoadAsDict") @mock.patch("requests.post") @@ -498,9 +512,9 @@ def test_getDigestForImage(self, mock_post, mock_ednLoadAsDict): 2023, 8, 8, 9, 9, 9, tzinfo=datetime.timezone.utc ), "docker.image/tags": [ - "1.0-valid-tag", - "1-valid-tag", - "valid-tag", + "1.0-valid-name", + "1-valid-name", + "valid-name", ], } } @@ -512,25 +526,39 @@ def test_getDigestForImage(self, mock_post, mock_ednLoadAsDict): "x-atomist-correlation-id": "81e2aee7-13d1-4097-93aa-90841e5bd43b" }, } + + # with tag dsr = cvelib.dso.DockerDSOSecurityReportNew() - res = dsr.getDigestForImage("valid-repo/valid-tag") + res = dsr.getDigestForImage("valid-repo:valid-name") self.assertEqual( - "valid-repo/valid-tag@sha256:af27abadb0a5e58b01e58806a02aca8c46d4c2b0823d6077d13de7ade017e9a9", + "valid-repo@sha256:af27abadb0a5e58b01e58806a02aca8c46d4c2b0823d6077d13de7ade017e9a9", res, ) - # bad invocation - with tests.testutil.capturedOutput() as (output, error): - dsr.getDigestForImage("valid-repo") - self.assertEqual("", output.getvalue().strip()) - self.assertTrue("Please use REPO/TAG" in error.getvalue().strip()) + # with sha256 + dsr = cvelib.dso.DockerDSOSecurityReportNew() + res = dsr.getDigestForImage( + "valid-repo@sha256:af27abadb0a5e58b01e58806a02aca8c46d4c2b0823d6077d13de7ade017e9a9" + ) + self.assertEqual( + "valid-repo@sha256:af27abadb0a5e58b01e58806a02aca8c46d4c2b0823d6077d13de7ade017e9a9", + res, + ) + + # bare + dsr = cvelib.dso.DockerDSOSecurityReportNew() + res = dsr.getDigestForImage("valid-repo") + self.assertEqual( + "valid-repo@sha256:af27abadb0a5e58b01e58806a02aca8c46d4c2b0823d6077d13de7ade017e9a9", + res, + ) # empty mock_post.return_value = self._mock_response_for_dso(content="edn-doc") mock_ednLoadAsDict.return_value = {} dsr = cvelib.dso.DockerDSOSecurityReportNew() with tests.testutil.capturedOutput() as (output, error): - res = dsr.getDigestForImage("valid-repo/valid-tag") + res = dsr.getDigestForImage("valid-repo:valid-name") self.assertEqual("", output.getvalue().strip()) self.assertTrue( "Could not find 'docker-repository-tags' as dict in response" @@ -552,7 +580,7 @@ def test_getDigestForImage(self, mock_post, mock_ednLoadAsDict): } dsr = cvelib.dso.DockerDSOSecurityReportNew() with tests.testutil.capturedOutput() as (output, error): - res = dsr.getDigestForImage("valid-repo/valid-tag") + res = dsr.getDigestForImage("valid-repo:valid-name") self.assertEqual("", output.getvalue().strip()) self.assertTrue( "Could not find 'image' in response for image" in error.getvalue().strip() @@ -582,7 +610,7 @@ def test_getDigestForImage(self, mock_post, mock_ednLoadAsDict): }, } with tests.testutil.capturedOutput() as (output, error): - res = dsr.getDigestForImage("valid-repo/valid-tag") + res = dsr.getDigestForImage("valid-repo:valid-name") self.assertEqual("", output.getvalue().strip()) self.assertEqual("", error.getvalue().strip()) self.assertEqual(0, len(res)) @@ -591,10 +619,10 @@ def test_parseImageDigest(self): """Test parseImageDigest""" tsts = [ # org, repo, sha256, expErr - ("valid-repo", "valid-tag", "sha256:deadbeef", ""), - ("valid-repo", "valid-tag", "bad", "does not contain '@sha256:"), - ("valid-repo", "valid-tag", "@sha256:@", "should have 1 '@'"), - ("valid-repo", "valid-tag/bad", "sha256:deadbeef", "should have 1 '/'"), + ("", "valid-name", "sha256:deadbeef", ""), + ("ignored", "valid-name", "sha256:deadbeef", ""), + ("", "valid-name", "bad", "does not contain '@sha256:"), + ("", "valid-name", "@sha256:@", "should have 1 '@'"), ] dsr = cvelib.dso.DockerDSOSecurityReportNew() with mock.patch.object( @@ -606,7 +634,7 @@ def test_parseImageDigest(self): ), ): for org, repo, sha, expErr in tsts: - digest = "%s/%s@%s" % (org, repo, sha) + digest = "%s@%s" % (repo, sha) with tests.testutil.capturedOutput() as (output, error): r1, r2, r3 = dsr.parseImageDigest(digest) @@ -619,7 +647,7 @@ def test_parseImageDigest(self): else: self.assertEqual("", output.getvalue().strip()) self.assertEqual("", error.getvalue().strip()) - self.assertEqual(org, r1) + self.assertEqual("", r1) self.assertEqual(repo, r2) self.assertEqual(sha, r3) @@ -755,7 +783,7 @@ def test_fetchScanReport(self, mock_post, mock_fetchVulnReports): _, d = self._validDockerDSOReport() mock_fetchVulnReports.return_value = d dsr = cvelib.dso.DockerDSOSecurityReportNew() - res, resMsg = dsr.fetchScanReport("valid-repo/valid-tag@sha256:deadbeef") + res, resMsg = dsr.fetchScanReport("valid-name@sha256:deadbeef") self.assertEqual("", resMsg) self.assertEqual(2, len(res)) self.assertEqual("pkg:something", res[0].component) @@ -769,7 +797,7 @@ def test_fetchScanReport(self, mock_post, mock_fetchVulnReports): self.assertEqual("high", res[0].severity) self.assertEqual("needed", res[0].status) self.assertEqual( - "https://dso.docker.com/images/valid-repo/digests/sha256:deadbeef", + "https://dso.docker.com/images/valid-name/digests/sha256:deadbeef", res[0].url, ) self.assertEqual("pkg:something", res[1].component) @@ -783,7 +811,7 @@ def test_fetchScanReport(self, mock_post, mock_fetchVulnReports): self.assertEqual("high", res[1].severity) self.assertEqual("needed", res[1].status) self.assertEqual( - "https://dso.docker.com/images/valid-repo/digests/sha256:deadbeef", + "https://dso.docker.com/images/valid-name/digests/sha256:deadbeef", res[1].url, ) @@ -794,9 +822,7 @@ def test_fetchScanReport(self, mock_post, mock_fetchVulnReports): _, d = self._validDockerDSOReport() d["data"]["vulnerabilitiesByPackage"][0]["vulnerabilities"][0]["fixedBy"] = None mock_fetchVulnReports.return_value = d - res, resMsg = dsr.fetchScanReport( - "valid-repo/valid-tag@sha256:deadbeef", fixable=False - ) + res, resMsg = dsr.fetchScanReport("valid-name@sha256:deadbeef", fixable=False) self.assertEqual("", resMsg) self.assertEqual(2, len(res)) self.assertEqual("pkg:something", res[0].component) @@ -809,9 +835,7 @@ def test_fetchScanReport(self, mock_post, mock_fetchVulnReports): self.assertEqual("/path/2", res[1].detectedIn) # fixable=True - res, resMsg = dsr.fetchScanReport( - "valid-repo/valid-tag@sha256:deadbeef", fixable=True - ) + res, resMsg = dsr.fetchScanReport("valid-name@sha256:deadbeef", fixable=True) self.assertEqual(0, len(res)) self.assertEqual("No problems found", resMsg) @@ -819,7 +843,7 @@ def test_fetchScanReport(self, mock_post, mock_fetchVulnReports): _, d = self._validDockerDSOReport() mock_fetchVulnReports.return_value = d res, resMsg = dsr.fetchScanReport( - "valid-repo/valid-tag@sha256:deadbeef", + "valid-name@sha256:deadbeef", priorities=["negligible"], ) self.assertEqual(0, len(res)) @@ -835,7 +859,7 @@ def test_fetchScanReport(self, mock_post, mock_fetchVulnReports): ] = "NEGLIGIBLE" mock_fetchVulnReports.return_value = d res, resMsg = dsr.fetchScanReport( - "valid-repo/valid-tag@sha256:deadbeef", + "valid-name@sha256:deadbeef", priorities=["negligible"], ) self.assertEqual("", resMsg) @@ -850,20 +874,16 @@ def test_fetchScanReport(self, mock_post, mock_fetchVulnReports): json_data=self._validDockerDSOPackageURLs() ) _, d = self._validDockerDSOReport() - res, resMsg = dsr.fetchScanReport( - "valid-repo/valid-tag@sha256:deadbeef", raw=True - ) + res, resMsg = dsr.fetchScanReport("valid-name@sha256:deadbeef", raw=True) exp = '"purl": "pkg:something@1.0.0",' self.assertEqual(0, len(res)) self.assertTrue(exp in resMsg) # bad invocation with tests.testutil.capturedOutput() as (output, error): - res, resMsg = dsr.fetchScanReport("valid-repo/valid-tag") + res, resMsg = dsr.fetchScanReport("valid-name") self.assertEqual("", output.getvalue().strip()) - self.assertTrue( - "Please use REPO/TAG@sha256:" in error.getvalue().strip() - ) + self.assertTrue("Please use REPO@sha256:SHA256" in error.getvalue().strip()) self.assertEqual(0, len(res)) self.assertEqual("", resMsg) @@ -875,7 +895,7 @@ def test_fetchScanReport(self, mock_post, mock_fetchVulnReports): del d["data"] mock_fetchVulnReports.return_value = d with tests.testutil.capturedOutput() as (output, error): - res, resMsg = dsr.fetchScanReport("valid-repo/valid-tag@sha256:deadbeef") + res, resMsg = dsr.fetchScanReport("valid-name@sha256:deadbeef") self.assertEqual("", output.getvalue().strip()) self.assertTrue("Could not find 'data' in" in error.getvalue().strip()) self.assertEqual(0, len(res)) @@ -888,7 +908,7 @@ def test_fetchScanReport(self, mock_post, mock_fetchVulnReports): del d["data"]["vulnerabilitiesByPackage"] mock_fetchVulnReports.return_value = d with tests.testutil.capturedOutput() as (output, error): - res, resMsg = dsr.fetchScanReport("valid-repo/valid-tag@sha256:deadbeef") + res, resMsg = dsr.fetchScanReport("valid-name@sha256:deadbeef") self.assertEqual("", output.getvalue().strip()) self.assertTrue( "Could not find 'vulnerabilitiesByPackage' in" in error.getvalue().strip() @@ -899,10 +919,10 @@ def test_fetchScanReport(self, mock_post, mock_fetchVulnReports): # Note, these are listed in reverse order ot the arguments to test_... @mock.patch("cvelib.dso.DockerDSOSecurityReportNew.fetchScanReport") @mock.patch("cvelib.dso.DockerDSOSecurityReportNew.getDigestForImage") - @mock.patch("cvelib.dso.DockerDSOSecurityReportNew.getOCIsForNamespace") + @mock.patch("cvelib.dso._getOCIsForRepo") def test_main_dso_dump_reports( self, - mock_getOCIsForNamespace, + mock__getOCIsForRepo, mock_getDigestForImage, mock_fetchScanReport, ): @@ -910,8 +930,8 @@ def test_main_dso_dump_reports( self.tmpdir = tempfile.mkdtemp(prefix="sedg-") os.environ["SEDG_EXPERIMENTAL"] = "1" - mock_getOCIsForNamespace.return_value = [("valid-tag", 1684472852)] - mock_getDigestForImage.return_value = "valid-repo/valid-tag@sha256:deadbeef" + mock__getOCIsForRepo.return_value = [("valid-name", 1684472852)] + mock_getDigestForImage.return_value = "valid-name@sha256:deadbeef" mock_fetchScanReport.return_value = ( [], '{"data": {"vulnerabilitiesByPackage": []}}', @@ -932,10 +952,10 @@ def test_main_dso_dump_reports( cvelib.dso.main_dso_dump_reports() today = datetime.datetime.now() - fn = ( - self.tmpdir - + "/subdir/%d/%0.2d/%0.2d/dso/valid-repo/valid-tag/deadbeef.json" - % (today.year, today.month, today.day) + fn = self.tmpdir + "/subdir/%d/%0.2d/%0.2d/dso/valid-repo/deadbeef.json" % ( + today.year, + today.month, + today.day, ) relfn = os.path.relpath(fn, self.tmpdir + "/subdir") self.assertEqual("Created: %s" % relfn, output.getvalue().strip()) @@ -962,11 +982,11 @@ def test_main_dso_dump_reports( os.unlink(fn) # duplicate (write out equivalent of json.dumps(..., sort_keys=True)) - fn = self.tmpdir + "/subdir/YYYY/MM/DD/dso/valid-repo/valid-tag/deadbeef.json" + fn = self.tmpdir + "/subdir/YYYY/MM/DD/dso/valid-repo/deadbeef.json" os.makedirs(os.path.dirname(fn)) with open(fn, "w") as fh: fh.write('{\n "data": {\n "vulnerabilitiesByPackage": []\n }\n}\n') - fn2 = self.tmpdir + "/subdir/YYYY/MM/dd/dso/valid-repo/valid-tag/deadbeef.json" + fn2 = self.tmpdir + "/subdir/YYYY/MM/dd/dso/valid-repo/deadbeef.json" os.makedirs(os.path.dirname(fn2)) with open(fn2, "w") as fh: fh.write('{\n "data": {\n "vulnerabilitiesByPackage": []\n }\n}\n') @@ -989,10 +1009,10 @@ def test_main_dso_dump_reports( # Note, these are listed in reverse order ot the arguments to test_... @mock.patch("cvelib.dso.DockerDSOSecurityReportNew.fetchScanReport") @mock.patch("cvelib.dso.DockerDSOSecurityReportNew.getDigestForImage") - @mock.patch("cvelib.dso.DockerDSOSecurityReportNew.getOCIsForNamespace") + @mock.patch("cvelib.dso._getOCIsForRepo") def test_main_dso_dump_reports_bad( self, - mock_getOCIsForNamespace, + mock__getOCIsForRepo, mock_getDigestForImage, mock_fetchScanReport, ): @@ -1001,7 +1021,7 @@ def test_main_dso_dump_reports_bad( os.environ["SEDG_EXPERIMENTAL"] = "1" # no image names - mock_getOCIsForNamespace.return_value = [] + mock__getOCIsForRepo.return_value = [] with mock.patch.object( cvelib.common.error, "__defaults__", @@ -1028,7 +1048,7 @@ def test_main_dso_dump_reports_bad( ) # no digests - mock_getOCIsForNamespace.return_value = [("valid-tag", 1684472852)] + mock__getOCIsForRepo.return_value = [("valid-name", 1684472852)] mock_getDigestForImage.return_value = "" with mock.patch.object( cvelib.common.error, @@ -1052,15 +1072,14 @@ def test_main_dso_dump_reports_bad( cvelib.dso.main_dso_dump_reports() self.assertEqual("", output.getvalue().strip()) self.assertTrue( - "WARN: Could not find digest for valid-repo/valid-tag" - in error.getvalue().strip(), + "WARN: Could not find digest for valid-repo" in error.getvalue().strip(), ) self.assertTrue( "Could not find any OCI image digests" in error.getvalue().strip(), ) - mock_getOCIsForNamespace.return_value = [("valid-tag", 1684472852)] - mock_getDigestForImage.return_value = "valid-repo/valid-tag@sha256:deadbeef" + mock__getOCIsForRepo.return_value = [("valid-name", 1684472852)] + mock_getDigestForImage.return_value = "valid-name@sha256:deadbeef" mock_fetchScanReport.return_value = [], "" with mock.patch.object( cvelib.common.error, @@ -1086,8 +1105,8 @@ def test_main_dso_dump_reports_bad( self.assertTrue("No new security reports" in error.getvalue().strip()) # unsupported scan status - mock_getOCIsForNamespace.return_value = [("valid-tag", 1684472852)] - mock_getDigestForImage.return_value = "valid-repo/valid-tag@sha256:deadbeef" + mock__getOCIsForRepo.return_value = [("valid-name", 1684472852)] + mock_getDigestForImage.return_value = "valid-name@sha256:deadbeef" mock_fetchScanReport.return_value = ([], '{"data": null}') with mock.patch.object( cvelib.common.error, diff --git a/tests/test_report.py b/tests/test_report.py index e2e4939..a7aeae1 100644 --- a/tests/test_report.py +++ b/tests/test_report.py @@ -4100,59 +4100,35 @@ def test__main_report_parse_args(self): # dso ( ["dso"], - "Please specify one of --alerts, --list or --list-digest with 'dso'", + "Please specify --alerts or --list-digest with 'dso'", ), ( - ["dso", "--list", "--namespace", "foo/bad"], - "--namespace 'foo/bad' should not contain '/'", - ), - ( - ["dso", "--list", "--namespace", "foo", "--raw"], + ["dso", "--list-digest", "foo", "--raw"], "Please specify --alerts with --raw", ), ( - ["dso", "--list", "--namespace", "foo", "--all"], + ["dso", "--list-digest", "foo", "--all"], "Please specify --alerts with --all", ), ( - ["dso", "--list", "--namespace", "foo", "--with-templates"], + ["dso", "--list-digest", "foo", "--with-templates"], "Please specify --alerts with --with-templates", ), ( [ "dso", "--alerts", - "--namespace", - "foo", - "--images", - "img@sha256:deadbeef", - "--list", - ], - "Unsupported option --list with --alerts", - ), - ( - [ - "dso", - "--alerts", - "--namespace", - "foo", "--images", "img@sha256:deadbeef", "--list-digest", - "foo/img", + "foo", ], "Unsupported option --list-digest with --alerts", ), - ( - ["dso", "--alerts"], - "Please specify --namespace with 'dso'", - ), ( [ "dso", "--alerts", - "--namespace", - "foo", "--images", "img@sha256:deadbeef", "--raw", @@ -4164,8 +4140,6 @@ def test__main_report_parse_args(self): [ "dso", "--alerts", - "--namespace", - "foo", "--images", "img@sha256:deadbeef", "--raw", @@ -4177,8 +4151,6 @@ def test__main_report_parse_args(self): [ "dso", "--alerts", - "--namespace", - "foo", ], "Please specify --images or --excluded-images with --alerts", ), @@ -4593,7 +4565,7 @@ def _getValidScanOCI(self, quay=False, gar=False, dso=False): elif gar: url = "https://us-docker.pkg.dev/valid-proj/valid-repo/valid-name@sha256:deadbeef" elif dso: - url = "https://dso.docker.com/images/valid-org/digests/sha256:deadbeef" + url = "https://dso.docker.com/images/valid-repo/digests/sha256:deadbeef" data = { "component": "foo", @@ -5297,38 +5269,13 @@ def test_main_report_gar_alerts_mixed(self, mock_fetchScanReport): self.assertEqual(res, exp) - @mock.patch("cvelib.dso.DockerDSOSecurityReportNew.getOCIsForNamespace") - def test_main_report_dso_list(self, mock_getOCIsForNamespace): - """Test main_report - dso --list""" - self._mock_cve_data_mixed() # for cveDirs - os.environ["SEDG_EXPERIMENTAL"] = "1" - mock_getOCIsForNamespace.return_value = [("valid-repo", 1684472852)] - args = ["dso", "--list", "--namespace", "valid-org"] - with tests.testutil.capturedOutput() as (output, error): - cvelib.report.main_report(args) - self.assertEqual("", error.getvalue().strip()) - self.assertEqual( - "valid-org/valid-repo (last updated: 2023-05-19 05:07:32)", - output.getvalue().strip(), - ) - - mock_getOCIsForNamespace.return_value = [("empty-repo", 0)] - args = ["dso", "--list", "--namespace", "valid-org"] - with tests.testutil.capturedOutput() as (output, error): - cvelib.report.main_report(args) - self.assertEqual("", error.getvalue().strip()) - self.assertEqual( - "valid-org/empty-repo (last updated: unknown)", - output.getvalue().strip(), - ) - @mock.patch("cvelib.dso.DockerDSOSecurityReportNew.getDigestForImage") def test_main_report_dso_list_digest(self, mock_getDigestForImage): """Test main_report - dso --list-digest""" self._mock_cve_data_mixed() # for cveDirs os.environ["SEDG_EXPERIMENTAL"] = "1" - mock_getDigestForImage.return_value = "valid-org/valid-repo@sha256:deadbeef" - args = ["dso", "--list-digest", "valid-repo", "--namespace", "valid-org"] + mock_getDigestForImage.return_value = "valid-repo@sha256:deadbeef" + args = ["dso", "--list-digest", "valid-repo"] with tests.testutil.capturedOutput() as (output, error): cvelib.report.main_report(args) self.assertEqual("", error.getvalue().strip()) @@ -5347,8 +5294,6 @@ def test_main_report_dso_alerts(self, mock_fetchScanReport, mock_getDigestForIma args = [ "dso", "--alerts", - "--namespace", - "valid-org", "--images", "valid-repo@sha256:deadbeef", ] @@ -5356,18 +5301,16 @@ def test_main_report_dso_alerts(self, mock_fetchScanReport, mock_getDigestForIma cvelib.report.main_report(args) self.assertEqual("", error.getvalue().strip()) self.assertTrue( - "# New reports\n\nvalid-org/valid-repo report: 1" in output.getvalue(), + "# New reports\n\nvalid-repo report: 1" in output.getvalue(), msg="output is:\n%s" % output.getvalue().strip(), ) # without image digest mock_fetchScanReport.return_value = [self._getValidScanOCI(dso=True)], "" - mock_getDigestForImage.return_value = "valid-org/valid-repo@sha256:deadbeef0123" + mock_getDigestForImage.return_value = "valid-repo@sha256:deadbeef0123" args = [ "dso", "--alerts", - "--namespace", - "valid-org", "--images", "valid-repo", ] @@ -5375,7 +5318,7 @@ def test_main_report_dso_alerts(self, mock_fetchScanReport, mock_getDigestForIma cvelib.report.main_report(args) self.assertEqual("", error.getvalue().strip()) self.assertTrue( - "# New reports\n\nvalid-org/valid-repo report: 1" in output.getvalue(), + "# New reports\n\nvalid-repo report: 1" in output.getvalue(), msg="output is:\n%s" % output.getvalue().strip(), ) @@ -5384,8 +5327,6 @@ def test_main_report_dso_alerts(self, mock_fetchScanReport, mock_getDigestForIma args = [ "dso", "--alerts", - "--namespace", - "valid-org", "--images", "valid-repo@sha256:deadbeef", ] @@ -5400,8 +5341,6 @@ def test_main_report_dso_alerts(self, mock_fetchScanReport, mock_getDigestForIma args = [ "dso", "--alerts", - "--namespace", - "valid-org", "--images", "valid-repo", ] @@ -5425,8 +5364,6 @@ def test_main_report_dso_alerts(self, mock_fetchScanReport, mock_getDigestForIma args = [ "dso", "--alerts", - "--namespace", - "valid-org", "--images", "valid-repo/bad@sha256:deadbeef", ] From 3fd9bc9f77c3f2c5b04d1554c48c061fb510771e Mon Sep 17 00:00:00 2001 From: Jamie Strandboge Date: Fri, 18 Aug 2023 14:55:01 -0500 Subject: [PATCH 5/6] fix: adjust scan matching to require detectedIn match --- cvelib/scan.py | 16 +++++++++------- tests/test_scan.py | 9 +++------ 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/cvelib/scan.py b/cvelib/scan.py index ac29edb..0cf81fa 100644 --- a/cvelib/scan.py +++ b/cvelib/scan.py @@ -145,12 +145,15 @@ def matches(self, b: "ScanOCI") -> Tuple[bool, bool]: """Test if self and b match in meaningful ways. Returns fuzzy and precise tuple """ - if self.advisory != b.advisory or self.component != b.component: + if ( + self.advisory != b.advisory + or self.component != b.component + or self.detectedIn != b.detectedIn + ): return False, False if ( - self.detectedIn != b.detectedIn - or self.versionAffected != b.versionAffected + self.versionAffected != b.versionAffected or self.versionFixed != b.versionFixed or self.severity != b.severity ): @@ -168,8 +171,8 @@ def _diff(a: "ScanOCI", b: "ScanOCI", attrib: str, precise: bool): if attrib == "versionFixed": attrib_p = "fixedBy" - # only show diff for versions, detectedIn and severity (the fuzzy - # matching parts) + # only show diff for versions and severity (the fuzzy matching + # parts) if getattr(a, attrib) == getattr(b, attrib) or ( not precise and attrib @@ -177,7 +180,6 @@ def _diff(a: "ScanOCI", b: "ScanOCI", attrib: str, precise: bool): "versionAffected", "versionFixed", "severity", - "detectedIn", ] ): return " %s: %s\n" % (attrib_p, getattr(a, attrib)) @@ -515,7 +517,7 @@ def getScanOCIsReportTemplates( cve_items: Dict[str, int] = {} scan_reports: str = "" highest: int = 0 - for oci in sorted(ocis, key=lambda i: (i.component, i.advisory)): + for oci in sorted(ocis, key=lambda i: (i.component, i.advisory, i.detectedIn)): cur: int = sev.index(oci.severity) if cur > highest: highest = cur diff --git a/tests/test_scan.py b/tests/test_scan.py index 2fe4e4e..a65247c 100644 --- a/tests/test_scan.py +++ b/tests/test_scan.py @@ -442,7 +442,6 @@ def test_diff(self): b_diff["fixedBy"] = "1.2.4" b_diff["severity"] = "low" b_diff["status"] = "needs-triage" - b_diff["detectedIn"] = "Other Distro" tsts = [ # a, b, precise, expected @@ -466,8 +465,7 @@ def test_diff(self): False, """ - type: oci component: foo -- detectedIn: Some Distro -+ detectedIn: Other Distro + detectedIn: Some Distro advisory: https://www.cve.org/CVERecord?id=CVE-2023-0001 version: 1.2.2 - fixedBy: 1.2.3 @@ -483,8 +481,7 @@ def test_diff(self): True, """ - type: oci component: foo -- detectedIn: Some Distro -+ detectedIn: Other Distro + detectedIn: Some Distro advisory: https://www.cve.org/CVERecord?id=CVE-2023-0001 version: 1.2.2 - fixedBy: 1.2.3 @@ -499,7 +496,7 @@ def test_diff(self): for a, b, precise, exp in tsts: res = a.diff(b, precise=precise) - self.assertEqual(exp, res) + self.assertEqual(exp, res, msg=res) def test_parse(self): """Test parse()""" From abc66e663707114ef41bf71a8037eaa91c7de3aa Mon Sep 17 00:00:00 2001 From: Jamie Strandboge Date: Fri, 18 Aug 2023 15:10:51 -0500 Subject: [PATCH 6/6] chore: rename _getOCIsForRepo as _getTagsForRepo() --- cvelib/dso.py | 26 +++++++++++++------------- tests/test_dso.py | 32 ++++++++++++++++---------------- 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/cvelib/dso.py b/cvelib/dso.py index ee0f2e9..47a96e0 100644 --- a/cvelib/dso.py +++ b/cvelib/dso.py @@ -202,7 +202,7 @@ def getDigestForImage(self, repo_full: str) -> str: return "" def parseImageDigest(self, digest: str) -> Tuple[str, str, str]: - """Parse the image digest into a (namespace, repo, sha256) tuple""" + """Parse the image digest into a (namespace (ignored), repo, sha256) tuple""" if "@sha256:" not in digest: error("Malformed digest '%s' (does not contain '@sha256:')" % digest) return ("", "", "") @@ -210,16 +210,12 @@ def parseImageDigest(self, digest: str) -> Tuple[str, str, str]: error("Malformed digest '%s' (should have 1 '@')" % digest) return ("", "", "") - sha256: str = "" + repo: str + sha256: str repo, sha256 = digest.split("@") return ("", repo, sha256) - def getOCIsForNamespace(self, _: str) -> List[Tuple[str, int]]: # pragma: nocover - """Obtain the list of DockerDSO repos for the specified namespace""" - # dso doesn't have a concept of namespaces - raise NotImplementedError - def fetchScanReport( self, repo_full: str, @@ -270,6 +266,10 @@ def fetchScanReport( return ocis, "" + def getOCIsForNamespace(self, _: str) -> List[Tuple[str, int]]: # pragma: nocover + # dso doesn't have a concept of namespaces + raise NotImplementedError + def getReposForNamespace(self, _: str) -> List[str]: # pragma: nocover # dso doesn't have a concept of repos within namespaces raise NotImplementedError @@ -752,7 +752,7 @@ def _getListEDN(namespace: str, days: int = 365) -> Dict: # "x-atomist-correlation-id": "81e2aee7-13d1-4097-93aa-90841e5bd43b" # } # } -def _getOCIsForRepo(repo_name: str) -> List[Tuple[str, int]]: +def _getTagsForRepo(repo_name: str) -> List[Tuple[str, int]]: """Obtain the list of DockerDSO tags for the specified repo""" if ":" in repo_name or "@" in repo_name or "/" in repo_name: error("Please use REPO (without :TAG or @sha256:SHA256)") @@ -810,10 +810,10 @@ def main_dso_dump_reports(): formatter_class=argparse.RawDescriptionHelpFormatter, epilog=textwrap.dedent( """\ -dso-dump-reports pulls all the latest security reports for OCI images in -REPO and outputs them to: +dso-dump-reports pulls all the latest security reports for the tagged images in +the REPO and outputs them to: - /path/to/reports/YY/MM/DD/dso/REPO/TAG/SHA256.json + /path/to/reports/YY/MM/DD/dso/REPO/SHA256.json Eg, to pull all dso security scan reports for org 'foo': @@ -843,7 +843,7 @@ def main_dso_dump_reports(): sr = DockerDSOSecurityReportNew() # Find latest digest for all images - oci_names: List[Tuple[str, int]] = _getOCIsForRepo(args.name) + oci_names: List[Tuple[str, int]] = _getTagsForRepo(args.name) if len(oci_names) == 0: error("Could not enumerate any OCI image names") return # for tests @@ -872,7 +872,7 @@ def main_dso_dump_reports(): # dso doesn't have dates or times in the security report, so we will # store them in a folder under today's date. Since the report path comes # from the date the report was fetched, we'll first search for the report - # by the dso/TAG/SHA256.json to see if we previously downloaded it. + # by the dso/REPO/SHA256.json to see if we previously downloaded it. # gather a list of potentially matching filenames json_files: Dict[str, str] = {} diff --git a/tests/test_dso.py b/tests/test_dso.py index 6a8d055..dae9772 100644 --- a/tests/test_dso.py +++ b/tests/test_dso.py @@ -376,8 +376,8 @@ def test__getListEDN(self, mock_post, mock_ednLoadAsDict): # Note, these are listed in reverse order ot the arguments to test_... @mock.patch("cvelib.dso.ednLoadAsDict") @mock.patch("requests.post") - def test__getOCIsForRepo(self, mock_post, mock_ednLoadAsDict): - """Test _getOCIsForRepo()""" + def test__getTagsForRepo(self, mock_post, mock_ednLoadAsDict): + """Test _getTagsForRepo()""" mock_post.return_value = self._mock_response_for_dso(content="edn-doc") mock_ednLoadAsDict.return_value = { "docker-repository-tags": { @@ -403,7 +403,7 @@ def test__getOCIsForRepo(self, mock_post, mock_ednLoadAsDict): "x-atomist-correlation-id": "81e2aee7-13d1-4097-93aa-90841e5bd43b" }, } - res = cvelib.dso._getOCIsForRepo("valid-repo") + res = cvelib.dso._getTagsForRepo("valid-repo") self.assertEqual(1, len(res)) self.assertEqual("1.0-valid-name", res[0][0]) @@ -442,7 +442,7 @@ def test__getOCIsForRepo(self, mock_post, mock_ednLoadAsDict): "x-atomist-correlation-id": "81e2aee7-13d1-4097-93aa-90841e5bd43b" }, } - res = cvelib.dso._getOCIsForRepo("valid-repo") + res = cvelib.dso._getTagsForRepo("valid-repo") self.assertEqual(1, len(res)) self.assertEqual(0, res[0][1]) @@ -450,7 +450,7 @@ def test__getOCIsForRepo(self, mock_post, mock_ednLoadAsDict): mock_post.return_value = self._mock_response_for_dso(content="edn-doc") mock_ednLoadAsDict.return_value = {} with tests.testutil.capturedOutput() as (output, error): - res = cvelib.dso._getOCIsForRepo("valid-repo") + res = cvelib.dso._getTagsForRepo("valid-repo") self.assertEqual("", output.getvalue().strip()) self.assertTrue( "Could not find 'docker-repository-tags' as dict in response" @@ -471,7 +471,7 @@ def test__getOCIsForRepo(self, mock_post, mock_ednLoadAsDict): }, } with tests.testutil.capturedOutput() as (output, error): - res = cvelib.dso._getOCIsForRepo("valid-repo") + res = cvelib.dso._getTagsForRepo("valid-repo") self.assertEqual("", output.getvalue().strip()) self.assertTrue( "Could not find 'image' in response for image" in error.getvalue().strip() @@ -488,7 +488,7 @@ def test__getOCIsForRepo(self, mock_post, mock_ednLoadAsDict): ), ): with tests.testutil.capturedOutput() as (output, error): - res = cvelib.dso._getOCIsForRepo("valid-repo:dont-use-tag") + res = cvelib.dso._getTagsForRepo("valid-repo:dont-use-tag") self.assertEqual("", output.getvalue().strip()) self.assertTrue( "Please use REPO (without :TAG or @sha256:SHA256)" @@ -919,10 +919,10 @@ def test_fetchScanReport(self, mock_post, mock_fetchVulnReports): # Note, these are listed in reverse order ot the arguments to test_... @mock.patch("cvelib.dso.DockerDSOSecurityReportNew.fetchScanReport") @mock.patch("cvelib.dso.DockerDSOSecurityReportNew.getDigestForImage") - @mock.patch("cvelib.dso._getOCIsForRepo") + @mock.patch("cvelib.dso._getTagsForRepo") def test_main_dso_dump_reports( self, - mock__getOCIsForRepo, + mock__getTagsForRepo, mock_getDigestForImage, mock_fetchScanReport, ): @@ -930,7 +930,7 @@ def test_main_dso_dump_reports( self.tmpdir = tempfile.mkdtemp(prefix="sedg-") os.environ["SEDG_EXPERIMENTAL"] = "1" - mock__getOCIsForRepo.return_value = [("valid-name", 1684472852)] + mock__getTagsForRepo.return_value = [("valid-name", 1684472852)] mock_getDigestForImage.return_value = "valid-name@sha256:deadbeef" mock_fetchScanReport.return_value = ( [], @@ -1009,10 +1009,10 @@ def test_main_dso_dump_reports( # Note, these are listed in reverse order ot the arguments to test_... @mock.patch("cvelib.dso.DockerDSOSecurityReportNew.fetchScanReport") @mock.patch("cvelib.dso.DockerDSOSecurityReportNew.getDigestForImage") - @mock.patch("cvelib.dso._getOCIsForRepo") + @mock.patch("cvelib.dso._getTagsForRepo") def test_main_dso_dump_reports_bad( self, - mock__getOCIsForRepo, + mock__getTagsForRepo, mock_getDigestForImage, mock_fetchScanReport, ): @@ -1021,7 +1021,7 @@ def test_main_dso_dump_reports_bad( os.environ["SEDG_EXPERIMENTAL"] = "1" # no image names - mock__getOCIsForRepo.return_value = [] + mock__getTagsForRepo.return_value = [] with mock.patch.object( cvelib.common.error, "__defaults__", @@ -1048,7 +1048,7 @@ def test_main_dso_dump_reports_bad( ) # no digests - mock__getOCIsForRepo.return_value = [("valid-name", 1684472852)] + mock__getTagsForRepo.return_value = [("valid-name", 1684472852)] mock_getDigestForImage.return_value = "" with mock.patch.object( cvelib.common.error, @@ -1078,7 +1078,7 @@ def test_main_dso_dump_reports_bad( "Could not find any OCI image digests" in error.getvalue().strip(), ) - mock__getOCIsForRepo.return_value = [("valid-name", 1684472852)] + mock__getTagsForRepo.return_value = [("valid-name", 1684472852)] mock_getDigestForImage.return_value = "valid-name@sha256:deadbeef" mock_fetchScanReport.return_value = [], "" with mock.patch.object( @@ -1105,7 +1105,7 @@ def test_main_dso_dump_reports_bad( self.assertTrue("No new security reports" in error.getvalue().strip()) # unsupported scan status - mock__getOCIsForRepo.return_value = [("valid-name", 1684472852)] + mock__getTagsForRepo.return_value = [("valid-name", 1684472852)] mock_getDigestForImage.return_value = "valid-name@sha256:deadbeef" mock_fetchScanReport.return_value = ([], '{"data": null}') with mock.patch.object(