Skip to content

Commit

Permalink
Merge pull request #122 from influxdata/jdstrand/dont-use-ns-with-dso
Browse files Browse the repository at this point in the history
fix: --list and --namespace not supported with dso
  • Loading branch information
jdstrand committed Aug 18, 2023
2 parents 0d7a70c + abc66e6 commit 56baaf9
Show file tree
Hide file tree
Showing 7 changed files with 346 additions and 331 deletions.
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,8 @@
$ cve-report quay --alerts --name <org>/<name>@<digest>
# Docker DSO container security reports
$ cve-report dso --list <repo>
$ cve-report dso --list-digest <repo>/<tag>
$ cve-report dso --alerts --name <repo>/<tag>@<digest>
$ cve-report dso --list-digest <repo>[:<tag>|@<sha256>]
$ cve-report dso --alerts --name <repo>[:<tag>|@<sha256>]
# if desired, leave the venv
$ deactivate
Expand Down
212 changes: 112 additions & 100 deletions cvelib/dso.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,114 +167,54 @@ class DockerDSOSecurityReportNew(SecurityReportInterface):
# }
def getDigestForImage(self, repo_full: str) -> str:
"""Obtain the digest for the the specified repo"""
if "/" not in repo_full:
error("Please use REPO/TAG", do_exit=False)
return ""

ns: str = ""
name: str = ""
ns, name = repo_full.split("/", 2)
name = repo_full
sha256: str = ""
tagsearch: str = ""
if "@sha256:" in name:
name, sha256 = name.split("@", 2)
elif ":" in name:
name, tagsearch = name.split(":", 2)

rese = _getListEDN(ns)
rese = _getListEDN(name)
if len(rese) < 1: # error condition from _getListEDN()
return ""

digest: str = ""
latest_d: Union[None, datetime] = None
for img in rese["docker-repository-tags"]["data"]:
if "image" in img and "docker.image/tags" in img["image"]:
if name in img["image"]["docker.image/tags"]:
digest = img["image"]["docker.image/digest"]
if sha256 != "" and sha256 == img["image"]["docker.image/digest"]:
digest = sha256
break
elif (
tagsearch != "" and tagsearch in img["image"]["docker.image/tags"]
) or tagsearch == "":
if (
latest_d is None
or img["image"]["docker.image/created-at"] > latest_d
):
digest = img["image"]["docker.image/digest"]
latest_d = img["image"]["docker.image/created-at"]

if digest != "":
return "%s/%s@%s" % (ns, name, digest)
return "%s@%s" % (name, digest)

return ""

def parseImageDigest(self, digest: str) -> Tuple[str, str, str]:
"""Parse the image digest into a (namespace, repo, sha256) tuple"""
"""Parse the image digest into a (namespace (ignored), repo, sha256) tuple"""
if "@sha256:" not in digest:
error("Malformed digest '%s' (does not contain '@sha256:')" % digest)
return ("", "", "")
elif digest.count("@") != 1:
error("Malformed digest '%s' (should have 1 '@')" % digest)
return ("", "", "")

tmp: str = ""
sha256: str = ""
tmp, sha256 = digest.split("@")

if tmp.count("/") != 1:
error("Malformed digest '%s' (should have 1 '/')" % digest)
return ("", "", "")

ns: str = ""
repo: str = ""
ns, repo = tmp.split("/")
return (ns, repo, sha256)

# $ curl -X POST https://api.dso.docker.com/datalog/shared-vulnerability/queries
# --data-binary '_getListEDN()'
# {
# "docker-repository-tags": {
# "data": [
# {
# "image": {
# "docker.image/digest": "sha256:af27abadb0a5e58b01e58806a02aca8c46d4c2b0823d6077d13de7ade017e9a9",
# "docker.image/created-at": "2022-12-16 00:23:40+00:00",
# "docker.image/tags": [
# "1.0-foo",
# "1-foo",
# "foo"
# ]
# }
# }
# ],
# "basis-t": "12345678",
# "tx": "12345678901234"
# },
# "extensions": {
# "x-atomist-correlation-id": "81e2aee7-13d1-4097-93aa-90841e5bd43b"
# }
# }
def getOCIsForNamespace(self, namespace: str) -> List[Tuple[str, int]]:
"""Obtain the list of DockerDSO repos for the specified namespace"""
if sys.stdout.isatty():
print("Fetching list of repos:", end="", flush=True)

rese = _getListEDN(namespace)
if len(rese) < 1: # error condition from _getListEDN()
return []

# gather all the tags and add the ones with the latest date
repos: List[Tuple[str, int]] = []
tmp: Dict[str, int] = {}
for img in rese["docker-repository-tags"]["data"]:
if "image" in img and "docker.image/tags" in img["image"]:
name: str = ""
# For now, take the longest tag, assuming it is the most accurate
# name (eg, 8 vs 8.1 vs 8.1.2). This may need to be adjusted...
for tagname in img["image"]["docker.image/tags"]:
if len(tagname) > len(name):
name = tagname
repo: str
sha256: str
repo, sha256 = digest.split("@")

m: int = 0
if (
"docker.image/created-at" in img["image"]
and img["image"]["docker.image/created-at"] is not None
):
# convert to expected format (epoch)
m = int(img["image"]["docker.image/created-at"].strftime("%s"))

if name not in tmp or m > tmp[name]:
tmp[name] = m

for name in tmp:
repos.append((name, tmp[name]))

if sys.stdout.isatty():
print(" done!")

return copy.deepcopy(repos)
return ("", repo, sha256)

def fetchScanReport(
self,
Expand All @@ -285,8 +225,8 @@ def fetchScanReport(
priorities: List[str] = [],
) -> Tuple[List[ScanOCI], str]:
"""Obtain the security manifest for the specified repo@sha256:..."""
if "/" not in repo_full or "@sha256:" not in repo_full:
error("Please use REPO/TAG@sha256:<sha256>", do_exit=False)
if "@sha256:" not in repo_full:
error("Please use REPO@sha256:SHA256", do_exit=False)
return [], ""

purls: Dict[str, List[str]] = _fetchPackageURLs(repo_full.split("@")[-1])
Expand All @@ -306,7 +246,7 @@ def fetchScanReport(
return [], ""

url: str = "https://dso.docker.com/images/%s/digests/%s" % (
repo_full.split("/")[0],
repo_full.split("@")[0],
repo_full.split("@")[1],
)

Expand All @@ -326,6 +266,10 @@ def fetchScanReport(

return ocis, ""

def getOCIsForNamespace(self, _: str) -> List[Tuple[str, int]]: # pragma: nocover
# dso doesn't have a concept of namespaces
raise NotImplementedError

def getReposForNamespace(self, _: str) -> List[str]: # pragma: nocover
# dso doesn't have a concept of repos within namespaces
raise NotImplementedError
Expand Down Expand Up @@ -784,6 +728,75 @@ def _getListEDN(namespace: str, days: int = 365) -> Dict:
return copy.deepcopy(rese)


# $ curl -X POST https://api.dso.docker.com/datalog/shared-vulnerability/queries
# --data-binary '_getListEDN()'
# {
# "docker-repository-tags": {
# "data": [
# {
# "image": {
# "docker.image/digest": "sha256:af27abadb0a5e58b01e58806a02aca8c46d4c2b0823d6077d13de7ade017e9a9",
# "docker.image/created-at": "2022-12-16 00:23:40+00:00",
# "docker.image/tags": [
# "1.0-foo",
# "1-foo",
# "foo"
# ]
# }
# }
# ],
# "basis-t": "12345678",
# "tx": "12345678901234"
# },
# "extensions": {
# "x-atomist-correlation-id": "81e2aee7-13d1-4097-93aa-90841e5bd43b"
# }
# }
def _getTagsForRepo(repo_name: str) -> List[Tuple[str, int]]:
"""Obtain the list of DockerDSO tags for the specified repo"""
if ":" in repo_name or "@" in repo_name or "/" in repo_name:
error("Please use REPO (without :TAG or @sha256:SHA256)")
return [] # for tests

if sys.stdout.isatty():
print("Fetching list of repos:", end="", flush=True)

rese = _getListEDN(repo_name)
if len(rese) < 1: # error condition from _getListEDN()
return []

# gather all the tags and add the ones with the latest date
repos: List[Tuple[str, int]] = []
tmp: Dict[str, int] = {}
for img in rese["docker-repository-tags"]["data"]:
if "image" in img and "docker.image/tags" in img["image"]:
name: str = ""
# For now, take the longest tag, assuming it is the most accurate
# name (eg, 8 vs 8.1 vs 8.1.2). This may need to be adjusted...
for tagname in img["image"]["docker.image/tags"]:
if len(tagname) > len(name):
name = tagname

m: int = 0
if (
"docker.image/created-at" in img["image"]
and img["image"]["docker.image/created-at"] is not None
):
# convert to expected format (epoch)
m = int(img["image"]["docker.image/created-at"].strftime("%s"))

if name not in tmp or m > tmp[name]:
tmp[name] = m

for name in tmp:
repos.append((name, tmp[name]))

if sys.stdout.isatty():
print(" done!")

return copy.deepcopy(repos)


#
# CLI mains
#
Expand All @@ -797,10 +810,10 @@ def main_dso_dump_reports():
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=textwrap.dedent(
"""\
dso-dump-reports pulls all the latest security reports for OCI images in
REPO and outputs them to:
dso-dump-reports pulls all the latest security reports for the tagged images in
the REPO and outputs them to:
/path/to/reports/YY/MM/DD/dso/REPO/TAG/SHA256.json
/path/to/reports/YY/MM/DD/dso/REPO/SHA256.json
Eg, to pull all dso security scan reports for org 'foo':
Expand Down Expand Up @@ -830,7 +843,7 @@ def main_dso_dump_reports():
sr = DockerDSOSecurityReportNew()

# Find latest digest for all images
oci_names: List[Tuple[str, int]] = sr.getOCIsForNamespace(args.name)
oci_names: List[Tuple[str, int]] = _getTagsForRepo(args.name)
if len(oci_names) == 0:
error("Could not enumerate any OCI image names")
return # for tests
Expand All @@ -842,12 +855,12 @@ def main_dso_dump_reports():
if sys.stdout.isatty(): # pragma: nocover
print(".", end="", flush=True)

name: str = "%s/%s" % (args.name, oci.split("/", maxsplit=5)[-1])
name: str = "%s:%s" % (args.name, oci)
digest: str = sr.getDigestForImage(name)
if digest == "":
warn("Could not find digest for %s" % name)
continue
ocis.append("%s@%s" % (name, digest.split("@")[1]))
ocis.append("%s@%s" % (args.name, digest.split("@")[1]))

if sys.stdout.isatty(): # pragma: nocover
print(" done!", flush=True)
Expand All @@ -859,12 +872,12 @@ def main_dso_dump_reports():
# dso doesn't have dates or times in the security report, so we will
# store them in a folder under today's date. Since the report path comes
# from the date the report was fetched, we'll first search for the report
# by the dso/TAG/SHA256.json to see if we previously downloaded it.
# by the dso/REPO/SHA256.json to see if we previously downloaded it.

# gather a list of potentially matching filenames
json_files: Dict[str, str] = {}
for root, _, files in os.walk(args.path):
if "/dso/%s/" % args.name not in root: # quick prune
if not root.endswith("/dso/%s" % args.name): # quick prune
continue
for f in files:
if f.endswith(".json"):
Expand Down Expand Up @@ -900,7 +913,7 @@ def main_dso_dump_reports():
if len(j) == 0:
continue

repo_name: str = full_name.split("@")[0].split("/")[-1]
repo_name: str = full_name.split("@")[0]
sha256: str = full_name.split("@")[1].split(":")[-1]

if sha256 not in json_files: # create under dir with today's date
Expand All @@ -911,7 +924,6 @@ def main_dso_dump_reports():
"%0.2d" % dobj.month,
"%0.2d" % dobj.day,
"dso",
args.name,
repo_name,
]:
dir = os.path.join(dir, subdir)
Expand Down
Loading

0 comments on commit 56baaf9

Please sign in to comment.