Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/scripts/dev-shell.sh
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ source "$DEV_PYTHON_ENV_PATH/bin/activate"
pids=""

step "Installing editable version of vunnel"
pip install -e . > /dev/null &
uv pip install -e . > /dev/null &
pids="$pids $!"

step "Building grype"
Expand Down
8 changes: 5 additions & 3 deletions src/vunnel/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,15 @@ def convert_value(obj: Any) -> Any:

@cli.command(name="run", help="run a vulnerability provider")
@click.argument("provider_name", metavar="PROVIDER")
@click.option("--skip-download", is_flag=True, help="skip downloading data", default=False)
@click.option("--skip-download", is_flag=True, help="skip downloading data", default=None)
@click.pass_obj
def run_provider(cfg: config.Application, provider_name: str, skip_download: bool) -> None:
def run_provider(cfg: config.Application, provider_name: str, skip_download: bool | None) -> None:
logging.info(f"running {provider_name} provider in {cfg.root}")
config = cfg.providers.get(provider_name)
# technically config has type Any | None, so double check to appease mypy
if config and config.runtime and hasattr(config.runtime, "skip_download"):
# only override when explicitly passed; otherwise env var (e.g. VUNNEL_PROVIDERS_<X>_RUNTIME_SKIP_DOWNLOAD)
# applied via apply_env_overrides takes effect
if skip_download is not None and config and config.runtime and hasattr(config.runtime, "skip_download"):
config.runtime.skip_download = skip_download

with providers.create(provider_name, cfg.root, config=config) as provider:
Expand Down
49 changes: 39 additions & 10 deletions src/vunnel/providers/chainguard/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import TYPE_CHECKING

from vunnel import provider, result, schema
from vunnel.providers.wolfi.parser import Parser
from vunnel.providers.wolfi.parser import SecDBParser, OSVParser
from vunnel.utils import timer

if TYPE_CHECKING:
Expand All @@ -23,9 +23,17 @@ class Config:
request_timeout: int = 125
# Override with VUNNEL_PROVIDERS_CHAINGUARD_SECDB_URL
secdb_url: str = "https://packages.cgr.dev/chainguard/security.json"
# Override with VUNNEL_PROVIDERS_CHAINGUARD_OSV_URL
osv_url: str = "https://advisories.cgr.dev/chainguard/v3/osv/chainguard-osv.tar.gz"
# Override with VUNNEL_PROVIDERS_CHAINGUARD_USE_OSV
use_osv: bool = False
# Override with VUNNEL_PROVIDERS_CHAINGUARD_OSV_MAX_WORKERS
osv_max_workers: int = 16


class Provider(provider.Provider):
# NOTE: schema and distribution version are actually set on init depending
# on which feed we configure the provider to use.
__schema__ = schema.OSSchema()
__distribution_version__ = int(__schema__.major_version)
Comment on lines +35 to 38

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a problem if this always reports OSSchema? I ensured the argument to writer.write below is the actual, correct schema


Expand All @@ -39,13 +47,30 @@ def __init__(self, root: str, config: Config | None = None):

self.logger.debug(f"config: {config}")

self.parser = Parser(
workspace=self.workspace,
url=config.secdb_url,
namespace=self._namespace,
download_timeout=self.config.request_timeout,
logger=self.logger,
)
if self.config.use_osv:
self.logger.info("Using OSV data source")
self.feed_url = self.config.osv_url
self.parser = OSVParser(
workspace=self.workspace,
url=config.osv_url,
namespace=self._namespace,
download_timeout=self.config.request_timeout,
logger=self.logger,
skip_download=self.config.runtime.skip_download,
max_workers=self.config.osv_max_workers,
)
self.schema = schema.OSVSchema(version="1.7.0")
else:
self.parser = SecDBParser(
workspace=self.workspace,
url=config.secdb_url,
namespace=self._namespace,
download_timeout=self.config.request_timeout,
logger=self.logger,
skip_download=self.config.runtime.skip_download,
)
self.feed_url = self.config.secdb_url
self.schema = schema.OSSchema()

# this provider requires the previous state from former runs
provider.disallow_existing_input_policy(config.runtime)
Expand All @@ -58,6 +83,10 @@ def name(cls) -> str:
def tags(cls) -> list[str]:
return ["vulnerability", "os"]

@classmethod
def supports_skip_download(cls) -> bool:
return True

def update(self, last_updated: datetime.datetime | None) -> tuple[list[str], int]:
with timer(self.name(), self.logger):
with self.results_writer() as writer, self.parser:
Expand All @@ -66,8 +95,8 @@ def update(self, last_updated: datetime.datetime | None) -> tuple[list[str], int
for vuln_id, record in vuln_dict.items():
writer.write(
identifier=os.path.join(f"{self._namespace.lower()}:{release.lower()}", vuln_id),
schema=self.__schema__,
schema=self.schema,
payload=record,
)

return [self.config.secdb_url], len(writer)
return [self.feed_url], len(writer)
12 changes: 9 additions & 3 deletions src/vunnel/providers/wolfi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from vunnel import provider, result, schema
from vunnel.utils import timer

from .parser import Parser
from .parser import SecDBParser

if TYPE_CHECKING:
import datetime
Expand All @@ -24,7 +24,9 @@ class Config:
request_timeout: int = 125
# Override with VUNNEL_PROVIDERS_WOLFI_SECDB_URL
secdb_url: str = "https://packages.wolfi.dev/os/security.json"

# Override with VUNNEL_PROVIDERS_WOLFI_ENABLE
# Enable allows us to switch to an OSV feed in the future if/when one becomes available
enable: bool = True

class Provider(provider.Provider):
__schema__ = schema.OSSchema()
Expand All @@ -40,7 +42,7 @@ def __init__(self, root: str, config: Config | None = None):

self.logger.debug(f"config: {config}")

self.parser = Parser(
self.parser = SecDBParser(
workspace=self.workspace,
url=config.secdb_url,
namespace=self._namespace,
Expand All @@ -60,6 +62,10 @@ def tags(cls) -> list[str]:
return ["vulnerability", "os"]

def update(self, last_updated: datetime.datetime | None) -> tuple[list[str], int]:
if not self.config.enable:
self.logger.info("Provider is disabled via config, skipping update")
return [], 0

with timer(self.name(), self.logger):
with self.results_writer() as writer, self.parser:
# TODO: tech debt: on subsequent runs, we should only write new vulns (this currently re-writes all)
Expand Down
Loading
Loading