Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/pythonpublish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ jobs:
runs-on: ubuntu-22.04

steps:
- uses: actions/checkout@v6
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- name: Set up Python
uses: actions/setup-python@v6
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
with:
python-version: '3.x'
- name: Install dependencies
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/zizmor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: GitHub Actions Security Analysis with zizmor 🌈

on:
push:
branches: ["main"]
branches: ["master"]
pull_request:
branches: ["**"]

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ $(VENV)/pyvenv.cfg: pyproject.toml
lint: $(VENV)/pyvenv.cfg
uv run ruff format --check && \
uv run ruff check && \
uv run mypy
uv run mypy && \
uv run interrogate -c pyproject.toml .

.PHONY: format
Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ select = ["ALL"]
# D203 and D213 are incompatible with D211 and D212 respectively.
# COM812 and ISC001 can cause conflicts when using ruff as a formatter.
# See https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules.
ignore = ["D203", "D213", "COM812", "ISC001"]
# S603 fires on any subprocess call with a list argument. All subprocess calls in this
# project use validated executables (resolved via shutil.which) with controlled arguments.
ignore = ["D203", "D213", "COM812", "ISC001", "S603"]

[tool.ruff.lint.per-file-ignores]
"src/it_depends/_cli.py" = [
Expand Down
212 changes: 120 additions & 92 deletions src/it_depends/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import webbrowser
from pathlib import Path
from sqlite3 import OperationalError
from typing import TYPE_CHECKING

from .audit import vulnerabilities
from .config import Settings
Expand All @@ -18,6 +19,11 @@
from .logger import setup_logger
from .sbom import cyclonedx_to_json

if TYPE_CHECKING:
from collections.abc import Callable

from .cache import PackageRepository

logger = logging.getLogger(__name__)


Expand All @@ -37,18 +43,9 @@ def parse_path_or_package_name(
return dependency


def main() -> None: # noqa: C901, PLR0912, PLR0915, PLR0911
settings = Settings() # type: ignore[call-arg]
setup_logger(settings.log_level)

# If max_workers isn't provided, use the number of CPUs.
# If that fails, use 1.
if settings.max_workers == -1:
settings.max_workers = os.cpu_count() or 1

logger.info("Starting it-depends with settings: %s", settings)

# Parse the target(s) -- either a path or a package name
def _parse_targets(
settings: Settings,
) -> tuple[SourceRepository | Dependency, SourceRepository | Dependency | None]:
try:
repo = parse_path_or_package_name(settings.target)

Expand All @@ -59,60 +56,125 @@ def main() -> None: # noqa: C901, PLR0912, PLR0915, PLR0911
except ValueError as e:
msg = str(e)
logger.exception(msg)
return
sys.exit(1)

# If this is a source repository, make sure the folder exists
if isinstance(repo, SourceRepository) and not repo.path.exists():
logger.error("The specified path does not exist: %s", repo.path)
sys.exit(1)

return repo, to_compare


def _clear_cache(settings: Settings) -> None:
if not settings.clear_cache:
return
db_path = Path(settings.database)
if db_path.exists():
db_path.unlink()
logger.info("Cache cleared.")
else:
logger.info("No cache exists.")


def _list_resolvers(repo: SourceRepository | Dependency, target: str) -> None:
# NOTE: This is so the user can pipe the output to another command.
# For example:
# $ it-depends "pip:numpy" --list | xargs -n1 > resolvers.txt
# $ cat resolvers.txt
sys.stdout.flush()
path = repo.path.absolute() if isinstance(repo, SourceRepository) else target
sys.stderr.write(f"Available resolvers for {path}:\n")
sys.stderr.flush()
for name, classifier in sorted((c.name, c) for c in resolvers()):
sys.stdout.write(name + " " * (12 - len(name)))
sys.stdout.flush()
available = classifier.is_available()
if not available:
sys.stderr.write(f"\tnot available: {available.reason}")
sys.stderr.flush()
elif isinstance(repo, SourceRepository) and not classifier.can_resolve_from_source(repo):
sys.stderr.write("\tincompatible with this path")
sys.stderr.flush()
elif isinstance(repo, Dependency) and repo.source != classifier.name:
sys.stderr.write("\tincompatible with this package specifier")
else:
sys.stderr.write("\tenabled")
sys.stderr.flush()

sys.stdout.write("\n")
sys.stdout.flush()

# Clear the database cache
if settings.clear_cache:
db_path = Path(settings.database)
if db_path.exists():
db_path.unlink()
logger.info("Cache cleared.")

def _get_output_writer(settings: Settings) -> Callable[[str], object]:
if settings.output_file is None:
return logger.info
if not settings.force and settings.output_file.exists():
logger.error("%s already exists. Re-run with `--force` to overwrite the file.", settings.output_file)
sys.exit(1)
return settings.output_file.write_text


def _write_output(
settings: Settings,
cache: DBPackageCache,
package_list: PackageRepository,
to_compare: SourceRepository | Dependency | None,
*,
output_write: Callable[[str], object],
) -> None:
if to_compare is not None:
to_compare_list = resolve(
to_compare,
cache=cache,
depth_limit=settings.depth_limit,
max_workers=settings.max_workers,
)
output_write(str(package_list.to_graph().distance_to(to_compare_list.to_graph(), normalize=settings.normalize)))
elif settings.output_format == "dot":
output_write(cache.to_dot(package_list.source_packages).source)
elif settings.output_format == "html":
output_write(graph_to_html(package_list, collapse_versions=not settings.all_versions))
if isinstance(settings.output_file, Path) and settings.output_file.exists():
webbrowser.open(str(settings.output_file.absolute()))
elif settings.output_format == "json":
output_write(json.dumps(package_list.to_obj(), sort_keys=True, indent=4))
elif settings.output_format == "cyclonedx":
sbom = None
for p in package_list.source_packages:
for bom in resolve_sbom(p, package_list, order_ascending=not settings.latest_resolution):
sbom = bom if sbom is None else sbom | bom
# only get the first resolution
# TODO(@evandowning): Provide a means for enumerating all valid SBOMs # noqa: TD003, FIX002
break
if sbom is None:
logger.error("No SBOM found for %s", settings.target)
else:
logger.info("No cache exists.")
output_write(cyclonedx_to_json(sbom.to_cyclonedx()))
else:
msg = f"TODO: Implement output format {settings.output_format}"
raise NotImplementedError(msg)


def main() -> None:
settings = Settings() # type: ignore[call-arg]
setup_logger(settings.log_level)

# If max_workers isn't provided, use the number of CPUs.
# If that fails, use 1.
if settings.max_workers == -1:
settings.max_workers = os.cpu_count() or 1

logger.info("Starting it-depends with settings: %s", settings)

repo, to_compare = _parse_targets(settings)
_clear_cache(settings)

# List the available resolvers
if settings.list:
# NOTE: This is so the user can pipe the output to another command.
# For example:
# $ it-depends "pip:numpy" --list | xargs -n1 > resolvers.txt
# $ cat resolvers.txt
sys.stdout.flush()
path = repo.path.absolute() if isinstance(repo, SourceRepository) else settings.target
sys.stderr.write(f"Available resolvers for {path}:\n")
sys.stderr.flush()
for name, classifier in sorted((c.name, c) for c in resolvers()):
sys.stdout.write(name + " " * (12 - len(name)))
sys.stdout.flush()
available = classifier.is_available()
if not available:
sys.stderr.write(f"\tnot available: {available.reason}")
sys.stderr.flush()
elif isinstance(repo, SourceRepository) and not classifier.can_resolve_from_source(repo):
sys.stderr.write("\tincompatible with this path")
sys.stderr.flush()
elif isinstance(repo, Dependency) and repo.source != classifier.name:
sys.stderr.write("\tincompatible with this package specifier")
else:
sys.stderr.write("\tenabled")
sys.stderr.flush()

sys.stdout.write("\n")
sys.stdout.flush()
_list_resolvers(repo, settings.target)
return

try:
if settings.output_file is None:
output_write = logger.info
else:
output_write = settings.output_file.write_text # type: ignore[assignment]
if not settings.force and settings.output_file.exists():
logger.error("%s already exists. Re-run with `--force` to overwrite the file.", settings.output_file)
return
output_write = _get_output_writer(settings)

with DBPackageCache(settings.database) as cache:
try:
Expand All @@ -126,47 +188,15 @@ def main() -> None: # noqa: C901, PLR0912, PLR0915, PLR0911
if not settings.clear_cache or settings.target.strip():
msg = f"{e!s}"
logger.exception(msg)
return
sys.exit(1)
if not package_list:
logger.error("Try --list to check for available resolvers for %s", settings.target)

# TODO(@evandowning): Should the cache be updated instead???? # noqa: TD003, FIX002
if settings.audit:
package_list = vulnerabilities(package_list)

if to_compare is not None:
to_compare_list = resolve(
to_compare,
cache=cache,
depth_limit=settings.depth_limit,
max_workers=settings.max_workers,
)
output_write(
str(package_list.to_graph().distance_to(to_compare_list.to_graph(), normalize=settings.normalize))
)
elif settings.output_format == "dot":
output_write(cache.to_dot(package_list.source_packages).source)
elif settings.output_format == "html":
output_write(graph_to_html(package_list, collapse_versions=not settings.all_versions))
if isinstance(settings.output_file, Path) and settings.output_file.exists():
webbrowser.open(str(settings.output_file.absolute()))
elif settings.output_format == "json":
output_write(json.dumps(package_list.to_obj(), sort_keys=True, indent=4))
elif settings.output_format == "cyclonedx":
sbom = None
for p in package_list.source_packages:
for bom in resolve_sbom(p, package_list, order_ascending=not settings.latest_resolution):
sbom = bom if sbom is None else sbom | bom
# only get the first resolution
# TODO(@evandowning): Provide a means for enumerating all valid SBOMs # noqa: TD003, FIX002
break
if sbom is None:
logger.error("No SBOM found for %s", settings.target)
else:
output_write(cyclonedx_to_json(sbom.to_cyclonedx()))
else:
msg = f"TODO: Implement output format {settings.output_format}"
raise NotImplementedError(msg)
_write_output(settings, cache, package_list, to_compare, output_write=output_write)

except OperationalError as e:
msg = (
Expand All @@ -176,11 +206,9 @@ def main() -> None: # noqa: C901, PLR0912, PLR0915, PLR0911
"scratch."
)
logger.exception(msg)
return
sys.exit(1)
finally:
if settings.output_file is not None and settings.output_file.exists():
logger.info("Output saved to %s", settings.output_file.absolute())
if settings.output_file is not None and not settings.output_file.exists():
logger.error("Output wasn't saved. Output file %s does not exist.", settings.output_file.absolute())

return
25 changes: 25 additions & 0 deletions src/it_depends/_exec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""Executable resolution utilities."""

from __future__ import annotations

import shutil


def resolve_executable(name: str) -> str:
"""Resolve a command name to its full filesystem path.

Args:
name: The name of the executable to locate.

Returns:
The fully-qualified path to the executable.

Raises:
FileNotFoundError: If the executable is not found on PATH.

"""
path = shutil.which(name)
if path is None:
msg = f"`{name}` executable not found in PATH"
raise FileNotFoundError(msg)
return path
7 changes: 4 additions & 3 deletions src/it_depends/audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ class OSVProject(VulnerabilityProvider):
def query(self, pkg: Package) -> Iterable[OSVVulnerability]:
"""Query the OSV project for vulnerabilities in package."""
q = {"version": str(pkg.version), "package": {"name": pkg.name}}
r = post(OSVProject.QUERY_URL, json=q).json() # noqa: S113
return map(OSVVulnerability.from_osv_dict, r.get("vulns", []))
r = post(OSVProject.QUERY_URL, json=q, timeout=30)
r.raise_for_status()
return map(OSVVulnerability.from_osv_dict, r.json().get("vulns", []))


def vulnerabilities(repo: PackageRepository, nworkers: int | None = None) -> PackageRepository:
Expand All @@ -83,7 +84,7 @@ def _get_vulninfo(pkg: Package) -> tuple[Package, frozenset[Vulnerability]]:
# Do not modify pkg here to ensure no concurrent
# modifications, instead return and let the main
# thread handle the updates.
return (pkg, frozenset({vuln: vuln for vuln in ret}))
return (pkg, frozenset(ret))

with (
ThreadPoolExecutor(max_workers=nworkers) as executor,
Expand Down
Loading
Loading