Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/vunnel/providers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
bitnami,
chainguard,
chainguard_libraries,
cran,
debian,
echo,
eol,
Expand Down Expand Up @@ -44,6 +45,7 @@
amazon.Provider.name(): amazon.Provider,
arch.Provider.name(): arch.Provider,
bitnami.Provider.name(): bitnami.Provider,
cran.Provider.name(): cran.Provider,
debian.Provider.name(): debian.Provider,
echo.Provider.name(): echo.Provider,
fedora.Provider.name(): fedora.Provider,
Expand Down
77 changes: 77 additions & 0 deletions src/vunnel/providers/cran/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import TYPE_CHECKING

from vunnel import provider, result, schema
from vunnel.utils import timer

from .parser import Parser

if TYPE_CHECKING:
import datetime


@dataclass
class Config:
runtime: provider.RuntimeConfig = field(
default_factory=lambda: provider.RuntimeConfig(
result_store=result.StoreStrategy.SQLITE,
existing_results=result.ResultStatePolicy.DELETE_BEFORE_WRITE,
),
)


class Provider(provider.Provider):
__schema__ = schema.OSVSchema()
__distribution_version__ = int(__schema__.major_version)

def __init__(self, root: str, config: Config | None = None):
if not config:
config = Config()

super().__init__(root, runtime_cfg=config.runtime)
self.config = config
self.logger.debug(f"config: {config}")

self.parser = Parser(
ws=self.workspace,
logger=self.logger,
)

# this provider requires the previous state from former runs
provider.disallow_existing_input_policy(config.runtime)

@classmethod
def name(cls) -> str:
return "cran"

@classmethod
def tags(cls) -> list[str]:
return ["vulnerability", "language"]

@classmethod
def compatible_schema(cls, schema_version: str) -> schema.Schema | None:
candidate = schema.OSVSchema(schema_version)
if candidate.major_version == cls.__schema__.major_version:
return candidate
return None

def update(self, last_updated: datetime.datetime | None) -> tuple[list[str], int]:
with timer(self.name(), self.logger):
with self.results_writer() as writer, self.parser:
for vuln_id, vuln_schema_version, record in self.parser.get():
vuln_schema = self.compatible_schema(vuln_schema_version)
if not vuln_schema:
self.logger.warning(
f"skipping vulnerability {vuln_id} with schema version {vuln_schema_version} "
f"as is incompatible with provider schema version {self.__schema__.version}",
)
continue
writer.write(
identifier=vuln_id.lower(),
schema=vuln_schema,
payload=record,
)

return self.parser.urls, len(writer)
80 changes: 80 additions & 0 deletions src/vunnel/providers/cran/git.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from __future__ import annotations

import logging
import os
import shlex
import shutil
import subprocess
import tempfile
from dataclasses import dataclass

from vunnel import utils


@dataclass
class GitRevision:
sha: str
file: str


class GitWrapper:
_check_cmd_ = "git --version"
_is_git_repo_cmd_ = "git rev-parse --is-inside-work-tree"
_clone_cmd_ = "git clone -b {branch} {src} {dest}"
_check_out_cmd_ = "git checkout {branch}"

def __init__(
self,
source: str,
branch: str,
checkout_dest: str,
logger: logging.Logger | None = None,
):
self.src = source
self.branch = branch
self.dest = checkout_dest
self.workspace = tempfile.gettempdir()

if not logger:
logger = logging.getLogger(self.__class__.__name__)
self.logger = logger

try:
out = self._exec_cmd(self._check_cmd_)
self.logger.trace(f"git executable verified using cmd: {self._check_cmd_}, output: {out}") # type: ignore[attr-defined]
except:
self.logger.exception('could not find required "git" executable. Please install git on host')
raise

def delete_repo(self) -> None:
if os.path.exists(self.dest):
self.logger.debug("deleting existing repository")
shutil.rmtree(self.dest, ignore_errors=True)

@utils.retry_with_backoff()
def clone_repo(self) -> None:
try:
self.logger.info(f"cloning git repository {self.src} branch {self.branch} to {self.dest}")
cmd = self._clone_cmd_.format(src=self.src, dest=self.dest, branch=self.branch)
out = self._exec_cmd(cmd)
self.logger.debug(f"initialized git repo, cmd: {cmd}, output: {out}")
except:
self.logger.exception(f"failed to clone git repository {self.src} branch {self.branch} to {self.dest}")
raise

def _exec_cmd(self, cmd: str) -> str:
"""
Run a command with errors etc handled
:param cmd: list of arguments (including command name, e.g. ['ls', '-l])
:param args:
:param kwargs:
:return:
"""
try:
self.logger.trace(f"running: {cmd}") # type: ignore[attr-defined]
cmd_list = shlex.split(cmd)
# S603 disable explanation: running git commands by design
return subprocess.check_output(cmd_list, text=True, stderr=subprocess.PIPE) # noqa: S603
except Exception as e:
self.logger.exception(f"error executing command: {cmd}")
raise e
133 changes: 133 additions & 0 deletions src/vunnel/providers/cran/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
from __future__ import annotations

import logging
import os
from typing import TYPE_CHECKING, Any

import yaml

from vunnel.tool import fixdate
from vunnel.utils import osv

if TYPE_CHECKING:
from collections.abc import Generator
from types import TracebackType

from vunnel.workspace import Workspace

from .git import GitWrapper

# Default OSV schema version to use when not specified in the YAML file
_DEFAULT_SCHEMA_VERSION_ = "1.6.1"

# Repository directory name for the R advisory database
_REPO_DIR_NAME_ = "r-advisory-database"


class Parser:
_git_src_url_ = "https://github.com/RConsortium/r-advisory-database.git"
_git_src_branch_ = "main"

def __init__(
self,
ws: Workspace,
fixdater: fixdate.Finder | None = None,
logger: logging.Logger | None = None,
):
if not fixdater:
fixdater = fixdate.default_finder(ws)
self.fixdater = fixdater
self.workspace = ws
self.git_url = self._git_src_url_
self.git_branch = self._git_src_branch_
self.urls = [self._git_src_url_]

if not logger:
logger = logging.getLogger(self.__class__.__name__)
self.logger = logger

checkout_dest = os.path.join(self.workspace.input_path, _REPO_DIR_NAME_)
self.git_wrapper = GitWrapper(
source=self.git_url,
branch=self.git_branch,
checkout_dest=checkout_dest,
logger=self.logger,
)

def __enter__(self) -> Parser:
self.fixdater.__enter__()
return self

def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) -> None:
self.fixdater.__exit__(exc_type, exc_val, exc_tb)

def _load(self) -> Generator[dict[str, Any]]:
self.logger.info(f"loading data from git repository {self.git_url}")

vuln_data_dir = os.path.join(self.workspace.input_path, _REPO_DIR_NAME_, "vulns")
for root, dirs, files in os.walk(vuln_data_dir):
dirs.sort()
for file in sorted(files):
if not file.endswith(".yaml"):
continue
full_path = os.path.join(root, file)
with open(full_path, encoding="utf-8") as f:
try:
yield yaml.safe_load(f)
except yaml.YAMLError as e:
self.logger.warning(f"failed to parse YAML file {full_path}: {e}")
continue

def _normalize(self, vuln_entry: dict[str, Any]) -> tuple[str, str, dict[str, Any]]:
self.logger.trace("normalizing vulnerability data") # type: ignore[attr-defined]

vuln_id = vuln_entry["id"]
vuln_schema = vuln_entry.get("schema_version", _DEFAULT_SCHEMA_VERSION_)

# RSEC YAML files use "upstream" for CVE references, but OSV schema uses "aliases".
# Normalize by moving upstream values to aliases.
if "upstream" in vuln_entry:
upstream = vuln_entry.pop("upstream")
if upstream:
existing_aliases = vuln_entry.get("aliases") or []
for cve in upstream:
if cve not in existing_aliases:
existing_aliases.append(cve)
vuln_entry["aliases"] = existing_aliases

# Prepend canonical references. grype-db uses the first reference URL as the
# dataSource field, so the advisory database link should come first.
canonical_refs: list[dict[str, str]] = []
affected = vuln_entry.get("affected")
if affected:
package_name = affected[0].get("package", {}).get("name")
if package_name:
advisory_url = self.git_url.removesuffix(".git")
canonical_refs.append({
"type": "ADVISORY",
"url": f"{advisory_url}/blob/{self.git_branch}/vulns/{package_name}/{vuln_id}.yaml",
})
canonical_refs.append({
"type": "WEB",
"url": f"https://osv.dev/vulnerability/{vuln_id}",
})

existing_refs = vuln_entry.get("references") or []
vuln_entry["references"] = canonical_refs + existing_refs

return vuln_id, vuln_schema, vuln_entry

def get(self) -> Generator[tuple[str, str, dict[str, Any]]]:
# Initialize the git repository
self.git_wrapper.delete_repo()
self.git_wrapper.clone_repo()

self.fixdater.download()

# Load the data from the git repository
for vuln_entry in self._load():
if vuln_entry is None:
continue
# Normalize the loaded data
osv.patch_fix_date(vuln_entry, self.fixdater)
yield self._normalize(vuln_entry)
15 changes: 12 additions & 3 deletions tests/quality/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ yardstick:
# Note:
# - ALWAYS leave the "import-db" annotation as-is
# - this version should ALWAYS match that of the other "grype" tool below
version: main+import-db=build/vulnerability.db
version: feat-osv-fixes-etc+import-db=build/vulnerability.db
takes: SBOM

- name: grype
Expand All @@ -36,7 +36,7 @@ yardstick:
# - a repo reference and optional "@branch" (e.g. "github.com/my-user-fork/grype@dev-fix-foo")
# Note:
# - this version should ALWAYS match that of the other "grype" tool above
version: main+import-db=https://grype.anchore.io/databases/v6/vulnerability-db_v6.0.2_2025-07-10T01:31:11Z_1752120925.tar.zst
version: feat-osv-fixes-etc+import-db=https://grype.anchore.io/databases/v6/vulnerability-db_v6.0.2_2025-07-10T01:31:11Z_1752120925.tar.zst
takes: SBOM
label: reference

Expand All @@ -47,7 +47,7 @@ grype_db:
# - a branch name (e.g. "dev-fix-foo")
# - a repo reference and optional "@branch" (e.g. "my-user-fork/grype-db@dev-fix-foo")
# - a local file path (e.g. "file://~/code/grype-db")
version: main
version: grype-osv

tests:

Expand Down Expand Up @@ -456,3 +456,12 @@ tests:
- nvd:cpe
validations:
- *default-validations

- provider: cran
images:
- docker.io/anchore/test_images:r-packages-2f30eae@sha256:ba8a2263b89b5852fb7d41a662969d1f5d68b9020cca64f0a64a4505ce67fbfe
expected_namespaces:
- cran:language:R
validations:
- <<: *default-validations
max_year: 2025
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
id: RSEC-2023-3
schema_version: "1.6.1"
details: The jsonlite R package is exposed to a vulnerability due to its use of yajl library version 2.1.0.
The vulnerability originates from the yajl_tree_parse function within yajl. Attackers can exploit this flaw
to cause a memory leak, which will result in out-of-memory in server and lead to a crash.
summary: Memory leak vulnerability
affected:
- package:
name: jsonlite
ecosystem: CRAN
ranges:
- type: ECOSYSTEM
events:
- introduced: 0.9.12
- fixed: 1.8.8
versions:
- 0.9.12
- 0.9.13
references:
- type: WEB
url: https://github.com/jeroen/jsonlite/pull/421
upstream:
- CVE-2023-33460
modified: "2025-05-16T00:12:44Z"
published: "2023-07-18T04:37:21.600Z"
Loading
Loading