diff --git a/python/packages/jumpstarter-driver-flashers/jumpstarter_driver_flashers/client.py b/python/packages/jumpstarter-driver-flashers/jumpstarter_driver_flashers/client.py index c48fd2eca..f7162a386 100644 --- a/python/packages/jumpstarter-driver-flashers/jumpstarter_driver_flashers/client.py +++ b/python/packages/jumpstarter-driver-flashers/jumpstarter_driver_flashers/client.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import base64 import hashlib import json @@ -12,8 +14,12 @@ from dataclasses import dataclass from pathlib import Path, PosixPath from queue import Queue +from typing import TYPE_CHECKING from urllib.parse import urlparse +if TYPE_CHECKING: + from jumpstarter.common.oci import OciCredentials + import click import pexpect import requests @@ -137,7 +143,9 @@ def flash( # noqa: C901 if headers: headers = self._validate_header_dict(headers) - oci_username, oci_password = self._resolve_oci_credentials(path, oci_username, oci_password) + oci_creds = self._resolve_oci_credentials(path, oci_username, oci_password) + oci_username = oci_creds.username + oci_password = oci_creds.plain_password should_download_to_httpd = True image_url = "" original_http_url = None @@ -1291,34 +1299,17 @@ def _validate_bearer_token(self, token: str | None) -> str | None: return token - def _validate_oci_credentials(self, username: str | None, password: str | None) -> tuple[str | None, str | None]: - if username is not None: - username = username.strip() - if password is not None: - password = password.strip() - - if username == "": - username = None - if password == "": - password = None - - if bool(username) != bool(password): - raise click.ClickException( - "OCI authentication requires both OCI_USERNAME and OCI_PASSWORD " - "environment variables (or both oci_username and oci_password arguments)" - ) - - return username, password - def _resolve_oci_credentials( self, path: PathBuf, username: str | None, password: str | None - ) -> tuple[str | None, str | None]: - if username is None and password is None and path.startswith("oci://"): - from jumpstarter.common.oci import resolve_oci_credentials - - username, password = resolve_oci_credentials(str(path)) + ) -> "OciCredentials": + from jumpstarter.common.oci import OciCredentials, resolve_oci_credentials - return self._validate_oci_credentials(username, password) + if username is not None or password is not None or str(path).startswith("oci://"): + try: + return resolve_oci_credentials(str(path), username=username, password=password) + except ValueError as err: + raise click.ClickException(str(err)) from err + return OciCredentials() def _fls_oci_auth_env(self, path: PathBuf, creds_file: str | None) -> str: if not str(path).startswith("oci://") or not creds_file: diff --git a/python/packages/jumpstarter-driver-flashers/jumpstarter_driver_flashers/client_test.py b/python/packages/jumpstarter-driver-flashers/jumpstarter_driver_flashers/client_test.py index c9888654a..bf5baa224 100644 --- a/python/packages/jumpstarter-driver-flashers/jumpstarter_driver_flashers/client_test.py +++ b/python/packages/jumpstarter-driver-flashers/jumpstarter_driver_flashers/client_test.py @@ -47,24 +47,24 @@ def test_validate_bearer_token_fails_invalid(): client._validate_bearer_token('token"with"quotes') -def test_validate_oci_credentials_fails_when_partial(): - """Test OCI credential validation fails when only one value is provided""" +def test_resolve_oci_credentials_fails_when_partial(): + """Test OCI credential resolution fails when only one value is provided""" client = MockFlasherClient() with pytest.raises(click.ClickException, match="OCI authentication requires both"): - client._validate_oci_credentials("myuser", None) + client._resolve_oci_credentials("oci://quay.io/org/image:tag", "myuser", None) with pytest.raises(click.ClickException, match="OCI authentication requires both"): - client._validate_oci_credentials(None, "mypassword") + client._resolve_oci_credentials("oci://quay.io/org/image:tag", None, "mypassword") -def test_validate_oci_credentials_accepts_pair_and_strips_whitespace(): - """Test OCI credential validation accepts full username/password pair""" +def test_resolve_oci_credentials_accepts_pair_and_strips_whitespace(): + """Test OCI credential resolution accepts full username/password pair and strips whitespace""" client = MockFlasherClient() - username, password = client._validate_oci_credentials(" myuser ", " mypassword ") - assert username == "myuser" - assert password == "mypassword" + creds = client._resolve_oci_credentials("oci://quay.io/org/image:tag", " myuser ", " mypassword ") + assert creds.username == "myuser" + assert creds.plain_password == "mypassword" def test_resolve_oci_credentials_reads_env_for_oci_path(monkeypatch): @@ -73,9 +73,9 @@ def test_resolve_oci_credentials_reads_env_for_oci_path(monkeypatch): monkeypatch.setenv("OCI_USERNAME", "env-user") monkeypatch.setenv("OCI_PASSWORD", "env-pass") - username, password = client._resolve_oci_credentials("oci://quay.io/org/image:tag", None, None) - assert username == "env-user" - assert password == "env-pass" + creds = client._resolve_oci_credentials("oci://quay.io/org/image:tag", None, None) + assert creds.username == "env-user" + assert creds.plain_password == "env-pass" def test_resolve_oci_credentials_ignores_env_for_non_oci_path(monkeypatch): @@ -84,30 +84,54 @@ def test_resolve_oci_credentials_ignores_env_for_non_oci_path(monkeypatch): monkeypatch.setenv("OCI_USERNAME", "env-user") monkeypatch.setenv("OCI_PASSWORD", "env-pass") - username, password = client._resolve_oci_credentials("https://example.com/image.raw.xz", None, None) - assert username is None - assert password is None + creds = client._resolve_oci_credentials("https://example.com/image.raw.xz", None, None) + assert creds.username is None + assert creds.password is None def test_resolve_oci_credentials_partial_env_falls_through_to_auth_file(monkeypatch): """Partial env vars should fall through to auth file lookup, not error.""" from unittest.mock import patch + from pydantic import SecretStr + + from jumpstarter.common.oci import OciCredentials + client = MockFlasherClient() monkeypatch.setenv("OCI_USERNAME", "env-user") monkeypatch.delenv("OCI_PASSWORD", raising=False) - # When auth file has no match, result is (None, None) — no error - with patch("jumpstarter.common.oci.read_auth_file_credentials", return_value=(None, None)): - username, password = client._resolve_oci_credentials("oci://quay.io/org/image:tag", None, None) - assert username is None - assert password is None + # When auth file has no match, result is unauthenticated — no error + with patch("jumpstarter.common.oci.read_auth_file_credentials", return_value=OciCredentials()): + creds = client._resolve_oci_credentials("oci://quay.io/org/image:tag", None, None) + assert creds.username is None + assert creds.password is None # When auth file has a match, those credentials are used - with patch("jumpstarter.common.oci.read_auth_file_credentials", return_value=("fileuser", "filepass")): - username, password = client._resolve_oci_credentials("oci://quay.io/org/image:tag", None, None) - assert username == "fileuser" - assert password == "filepass" + with patch( + "jumpstarter.common.oci.read_auth_file_credentials", + return_value=OciCredentials(username="fileuser", password=SecretStr("filepass")), + ): + creds = client._resolve_oci_credentials("oci://quay.io/org/image:tag", None, None) + assert creds.username == "fileuser" + assert creds.plain_password == "filepass" + + +def test_resolve_oci_credentials_normalizes_empty_strings(monkeypatch): + """Empty-string username/password should be treated as absent and fall through.""" + from unittest.mock import patch + + from jumpstarter.common.oci import OciCredentials + + client = MockFlasherClient() + monkeypatch.delenv("OCI_USERNAME", raising=False) + monkeypatch.delenv("OCI_PASSWORD", raising=False) + + with patch("jumpstarter.common.oci.read_auth_file_credentials", return_value=OciCredentials()) as mock_auth: + creds = client._resolve_oci_credentials("oci://quay.io/org/image:tag", "", "") + assert creds.username is None + assert creds.password is None + mock_auth.assert_called_once() def test_fls_oci_auth_env_sources_credentials_file(): diff --git a/python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/driver.py b/python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/driver.py index d22753d28..7d73c424d 100644 --- a/python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/driver.py +++ b/python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/driver.py @@ -105,19 +105,9 @@ async def flash_oci( if not oci_url.startswith("oci://"): raise ValueError(f"OCI URL must start with oci://, got: {oci_url}") - # If explicit credentials were provided, validate immediately - if oci_username or oci_password: - if bool(oci_username) != bool(oci_password): - raise ValueError("OCI authentication requires both username and password") - else: - # Fall back to env vars, then container auth files - from jumpstarter.common.oci import resolve_oci_credentials + from jumpstarter.common.oci import resolve_oci_credentials - oci_username, oci_password = resolve_oci_credentials(oci_url) - if oci_username and oci_password: - self.logger.info("Using OCI registry credentials from environment or auth file") - elif oci_username or oci_password: - raise ValueError("OCI authentication requires both username and password") + creds = resolve_oci_credentials(oci_url, username=oci_username, password=oci_password) target_path = str(self.parent.validate_partition(partition)) @@ -130,10 +120,10 @@ async def flash_oci( fls_cmd = [fls_binary, "from-url", oci_url, target_path] fls_env = None - if oci_username and oci_password: + if creds.is_authenticated: fls_env = os.environ.copy() - fls_env["FLS_REGISTRY_USERNAME"] = oci_username - fls_env["FLS_REGISTRY_PASSWORD"] = oci_password + fls_env["FLS_REGISTRY_USERNAME"] = creds.username + fls_env["FLS_REGISTRY_PASSWORD"] = creds.plain_password self.logger.info(f"Running fls: {' '.join(fls_cmd)}") diff --git a/python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/driver_test.py b/python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/driver_test.py index 129e144ee..186fa21b3 100644 --- a/python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/driver_test.py +++ b/python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/driver_test.py @@ -14,6 +14,7 @@ from jumpstarter_driver_qemu.driver import Qemu, QemuFlasher +from jumpstarter.common.oci import OciCredentials from jumpstarter.common.utils import serve @@ -316,7 +317,7 @@ async def test_flash_oci_no_credentials(): # Ensure OCI env vars are not set so driver doesn't pick them up env_clean = {k: v for k, v in os.environ.items() if k not in ("OCI_USERNAME", "OCI_PASSWORD")} with patch.dict(os.environ, env_clean, clear=True): - with patch("jumpstarter.common.oci.read_auth_file_credentials", return_value=(None, None)): + with patch("jumpstarter.common.oci.read_auth_file_credentials", return_value=OciCredentials()): with patch("jumpstarter_driver_qemu.driver.get_fls_binary", return_value="fls"): with patch( "asyncio.create_subprocess_exec", new_callable=AsyncMock, return_value=mock_process diff --git a/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/client.py b/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/client.py index 774caacdb..cd495efa0 100644 --- a/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/client.py +++ b/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/client.py @@ -1,6 +1,11 @@ +from __future__ import annotations + from dataclasses import dataclass from pathlib import Path -from typing import Dict, Optional +from typing import TYPE_CHECKING, Dict, Optional + +if TYPE_CHECKING: + from jumpstarter.common.oci import OciCredentials import click from jumpstarter_driver_composite.client import CompositeClient @@ -275,16 +280,11 @@ def _flash_operation(): return self._execute_flash_operation(_flash_operation, power_off=power_off) - def _read_oci_credentials(self, oci_url: str): + def _read_oci_credentials(self, oci_url: str) -> OciCredentials: """Read OCI registry credentials from environment variables or auth files.""" from jumpstarter.common.oci import resolve_oci_credentials - username, password = resolve_oci_credentials(oci_url) - - if bool(username) != bool(password): - raise click.ClickException("OCI authentication requires both username and password") - - return username, password + return resolve_oci_credentials(oci_url) def _flash_oci_auto_impl( self, @@ -292,7 +292,7 @@ def _flash_oci_auto_impl( partitions: Dict[str, str] | None = None, ): """Core implementation of OCI flash without wrapper logic.""" - oci_username, oci_password = self._read_oci_credentials(oci_url) + creds = self._read_oci_credentials(oci_url) self.logger.info("Checking for fastboot devices on Exporter...") detection_result = self.call("detect_fastboot_device", 5, 2.0) @@ -307,8 +307,8 @@ def _flash_oci_auto_impl( "flash_oci_image", oci_url, partitions, - oci_username, - oci_password, + creds.username, + creds.plain_password, ) # Display FLS output to user diff --git a/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/client_test.py b/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/client_test.py index 125f0b103..715e55b92 100644 --- a/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/client_test.py +++ b/python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/client_test.py @@ -4,8 +4,10 @@ import click import pytest from jumpstarter_driver_pyserial.driver import PySerial +from pydantic import SecretStr from .driver import RideSXDriver +from jumpstarter.common.oci import OciCredentials from jumpstarter.common.utils import serve @@ -57,20 +59,21 @@ def test_validate_partition_mappings(ridesx_client): def test_flash_oci_auto_success(ridesx_client): """Test successful flash_oci_auto call""" - with patch.object(ridesx_client, "call") as mock_call: - mock_call.side_effect = [ - None, # boot_to_fastboot call - {"status": "device_found", "device_id": "ABC123"}, - {"status": "success"}, - ] + with patch("jumpstarter.common.oci.resolve_oci_credentials", return_value=OciCredentials()): + with patch.object(ridesx_client, "call") as mock_call: + mock_call.side_effect = [ + None, # boot_to_fastboot call + {"status": "device_found", "device_id": "ABC123"}, + {"status": "success"}, + ] - result = ridesx_client.flash_oci_auto("oci://quay.io/org/image:tag") + result = ridesx_client.flash_oci_auto("oci://quay.io/org/image:tag") - assert result == {"status": "success"} - # Verify flash_oci_image was called with the OCI URL - flash_call = mock_call.call_args_list[2] - assert flash_call[0][0] == "flash_oci_image" - assert flash_call[0][1] == "oci://quay.io/org/image:tag" + assert result == {"status": "success"} + # Verify flash_oci_image was called with the OCI URL + flash_call = mock_call.call_args_list[2] + assert flash_call[0][0] == "flash_oci_image" + assert flash_call[0][1] == "oci://quay.io/org/image:tag" def test_flash_oci_auto_error_cases(ridesx_client): @@ -84,11 +87,30 @@ def test_flash_oci_auto_error_cases(ridesx_client): ridesx_client.flash_oci_auto("quay.io/org/image:tag") # No device found - with patch.object(ridesx_client, "call") as mock_call: - mock_call.return_value = {"status": "no_device_found", "device_id": None} - - with pytest.raises(click.ClickException, match="No fastboot devices found"): - ridesx_client.flash_oci_auto("oci://image:tag") + with patch("jumpstarter.common.oci.resolve_oci_credentials", return_value=OciCredentials()): + with patch.object(ridesx_client, "call") as mock_call: + mock_call.return_value = {"status": "no_device_found", "device_id": None} + + with pytest.raises(click.ClickException, match="No fastboot devices found"): + ridesx_client.flash_oci_auto("oci://image:tag") + + +def test_flash_oci_auto_passes_authenticated_credentials(ridesx_client): + """Authenticated credentials should pass username and plain password to flash_oci_image.""" + creds = OciCredentials(username="myuser", password=SecretStr("mypass")) + with patch("jumpstarter.common.oci.resolve_oci_credentials", return_value=creds): + with patch.object(ridesx_client, "call") as mock_call: + mock_call.side_effect = [ + None, # boot_to_fastboot + {"status": "device_found", "device_id": "ABC123"}, + {"status": "success"}, + ] + + ridesx_client.flash_oci_auto("oci://quay.io/org/image:tag") + + flash_call = mock_call.call_args_list[2] + assert flash_call[0][3] == "myuser" + assert flash_call[0][4] == "mypass" # _execute_flash_command Tests diff --git a/python/packages/jumpstarter/jumpstarter/common/oci.py b/python/packages/jumpstarter/jumpstarter/common/oci.py index 80630d898..11333df45 100644 --- a/python/packages/jumpstarter/jumpstarter/common/oci.py +++ b/python/packages/jumpstarter/jumpstarter/common/oci.py @@ -13,48 +13,152 @@ """ import base64 +import binascii import json import logging import os +import re +import tomllib +from functools import lru_cache from pathlib import Path +from typing import Any from urllib.parse import urlparse +from pydantic import BaseModel, ConfigDict, Field, SecretStr, ValidationError, field_validator, model_validator + logger = logging.getLogger(__name__) +__all__ = [ + "OciCredentials", + "parse_oci_registry", + "read_auth_file_credentials", + "resolve_oci_credentials", +] -def parse_oci_registry(oci_url: str) -> str: - """Extract the registry hostname from an OCI URL. - Handles URLs in the format ``oci://registry/repo:tag`` as well as plain - image references like ``registry/repo:tag``. +class OciCredentials(BaseModel): + """Resolved OCI registry credentials. - Args: - oci_url: OCI image reference, optionally prefixed with ``oci://``. + Construction enforces that username and password are both set or both None. + Passing only one raises ``ValidationError``. + """ - Returns: - Registry hostname (with port if present), e.g. ``quay.io`` or - ``registry.example.com:5000``. + model_config = ConfigDict(frozen=True) + + username: str | None = None + password: SecretStr | None = Field(default=None) + + @field_validator("username", "password", mode="before") + @classmethod + def _normalize_empty(cls, v: str | SecretStr | None) -> str | None: + if isinstance(v, SecretStr): + v = v.get_secret_value() + if isinstance(v, str): + v = v.strip() + return v if v else None + return v + + @property + def plain_password(self) -> str | None: + """Return the raw password string, or None if unset.""" + return self.password.get_secret_value() if self.password else None + + @property + def is_authenticated(self) -> bool: + return self.username is not None and self.password is not None + + @model_validator(mode="after") + def _check_both_or_neither(self) -> "OciCredentials": + if bool(self.username) != bool(self.password): + raise ValueError("OCI authentication requires both username and password") + return self + + +def _get_registries_conf_paths() -> list[Path]: + """Return ordered list of registries.conf paths to search.""" + return [ + Path.home() / ".config" / "containers" / "registries.conf", + Path("/etc/containers/registries.conf"), + Path("/usr/share/containers/registries.conf"), + ] + + +@lru_cache(maxsize=1) +def _get_unqualified_search_registries() -> tuple[str, ...]: + """Read unqualified-search-registries from containers registries.conf. + + Falls back to ``("docker.io",)`` if no config is found. + """ + for conf_path in _get_registries_conf_paths(): + if not conf_path.is_file(): + continue + try: + with open(conf_path, "rb") as f: + data = tomllib.load(f) + registries = tuple( + r for r in data.get("unqualified-search-registries", []) if isinstance(r, str) and r.strip() + ) + if registries: + logger.debug("Read unqualified-search-registries from %s: %s", conf_path, registries) + return registries + except (OSError, tomllib.TOMLDecodeError) as e: + logger.debug("Skipping registries.conf %s: %s", conf_path, e) + continue + return ("docker.io",) + + +def _parse_registries_for_url(oci_url: str) -> tuple[str, ...]: + """Return possible registries for an OCI URL. + + For explicit registry URLs, returns a single-element tuple. + For bare image names (e.g. ``ubuntu:latest``), returns the host's + configured ``unqualified-search-registries``. """ url = oci_url if url.startswith("oci://"): url = url[len("oci://") :] - # Strip any tag or digest suffix for parsing purposes - # e.g. "quay.io/org/repo:tag" -> we just need "quay.io" - # The registry is the first path component + # Strip digest references before parsing — "ubuntu@sha256:abc" would + # otherwise have the colon corrupt port/tag disambiguation. + url = re.sub(r"@sha(256|384|512):[a-fA-F0-9]+", "", url) + parts = url.split("/", 1) registry = parts[0] - # Remove tag/digest if someone passed just "registry:tag" with no path - if "/" not in url and ":" in registry: - # Could be registry:port or image:tag — if the part after : is numeric - # it's a port, otherwise it's a tag on a Docker Hub image - host_port = registry.split(":", 1) - if host_port[1].isdigit(): - return registry # registry:port - return "docker.io" # bare image like "ubuntu:latest" + if "/" not in url: + if ":" in registry: + host_port = registry.split(":", 1) + if host_port[1].isdigit(): + return (registry,) # registry:port + return _get_unqualified_search_registries() # bare image like "ubuntu:latest" + if "." not in registry and registry != "localhost": + return _get_unqualified_search_registries() # bare image like "ubuntu" + else: + # namespace/image form (e.g. "library/ubuntu") — first segment has + # no dot and isn't localhost, so it's not a registry hostname. + if "." not in registry and registry != "localhost": + if ":" not in registry or not registry.split(":", 1)[1].isdigit(): + return _get_unqualified_search_registries() - return registry + return (registry,) + + +def parse_oci_registry(oci_url: str) -> str: + """Extract the registry hostname from an OCI URL. + + Handles URLs in the format ``oci://registry/repo:tag`` as well as plain + image references like ``registry/repo:tag``. For bare image names, + returns the first entry from the host's ``unqualified-search-registries`` + (defaults to ``docker.io``). + + Args: + oci_url: OCI image reference, optionally prefixed with ``oci://``. + + Returns: + Registry hostname (with port if present), e.g. ``quay.io`` or + ``registry.example.com:5000``. + """ + return _parse_registries_for_url(oci_url)[0] def _get_auth_file_paths() -> list[Path]: @@ -119,7 +223,7 @@ def _normalize_registry(registry: str) -> str: return registry -def _lookup_credentials_in_auth_data(auth_data: dict, registry: str) -> tuple[str | None, str | None]: +def _lookup_credentials_in_auth_data(auth_data: dict[str, Any], registry: str) -> OciCredentials: """Look up credentials for a registry in parsed auth file data. Args: @@ -127,95 +231,127 @@ def _lookup_credentials_in_auth_data(auth_data: dict, registry: str) -> tuple[st registry: Normalized registry hostname to look up. Returns: - Tuple of (username, password), or (None, None) if not found. + OciCredentials with both fields set, or ``OciCredentials()`` if not found. """ auths = auth_data.get("auths", {}) if not auths: - return None, None + return OciCredentials() # Try to find a matching entry — normalize all keys for comparison for key, value in auths.items(): + if not isinstance(value, dict): + continue if _normalize_registry(key) == registry: # The "auth" field is base64(username:password) auth_b64 = value.get("auth") if auth_b64: try: - decoded = base64.b64decode(auth_b64).decode("utf-8") + decoded = base64.b64decode(auth_b64, validate=True).decode("utf-8") username, password = decoded.split(":", 1) - return username, password - except (ValueError, UnicodeDecodeError) as e: - logger.warning(f"Failed to decode auth entry for {key}: {e}") - continue + if username and password: + return OciCredentials(username=username, password=password) + except (binascii.Error, ValueError, UnicodeDecodeError) as e: + logger.warning("Failed to decode auth entry for %s: %s", key, e) # Some auth files use separate username/password fields username = value.get("username") password = value.get("password") if username and password: - return username, password + try: + return OciCredentials(username=username, password=password) + except (ValueError, ValidationError) as e: + logger.warning("Failed to validate auth entry for %s: %s", key, e) - return None, None + return OciCredentials() -def read_auth_file_credentials( - oci_url: str, -) -> tuple[str | None, str | None]: +def read_auth_file_credentials(oci_url: str) -> OciCredentials: """Read registry credentials from container auth files. Searches standard auth file locations for credentials matching the - registry in the given OCI URL. Returns the first match found. + registry in the given OCI URL. For bare image names, tries all + registries from ``unqualified-search-registries`` in registries.conf. + Returns the first match found. Args: oci_url: OCI image reference (e.g. ``oci://quay.io/org/image:tag``). Returns: - Tuple of (username, password), or (None, None) if no credentials - are found. + OciCredentials with both fields set, or ``OciCredentials()`` with + both fields None if no credentials are found. """ - registry = parse_oci_registry(oci_url) - normalized_registry = _normalize_registry(registry) + registries = _parse_registries_for_url(oci_url) - for auth_path in _get_auth_file_paths(): - if not auth_path.is_file(): - continue + for registry in registries: + normalized_registry = _normalize_registry(registry) - try: - auth_data = json.loads(auth_path.read_text(encoding="utf-8")) - except (json.JSONDecodeError, OSError) as e: - logger.debug(f"Skipping auth file {auth_path}: {e}") - continue + for auth_path in _get_auth_file_paths(): + if not auth_path.is_file(): + continue - username, password = _lookup_credentials_in_auth_data(auth_data, normalized_registry) - if username and password: - logger.info(f"Found OCI registry credentials for {registry} in {auth_path}") - return username, password + try: + auth_data = json.loads(auth_path.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError) as e: + logger.debug("Skipping auth file %s: %s", auth_path, e) + continue - logger.debug(f"No credentials found for registry {registry} in any auth file") - return None, None + creds = _lookup_credentials_in_auth_data(auth_data, normalized_registry) + if creds.is_authenticated: + logger.info("Found OCI registry credentials for %s in %s", registry, auth_path) + return creds + logger.debug("No credentials found for %s in any auth file", registries) + return OciCredentials() -def resolve_oci_credentials(oci_url: str) -> tuple[str | None, str | None]: - """Resolve OCI registry credentials from environment or auth files. - Checks OCI_USERNAME/OCI_PASSWORD environment variables first, - then falls back to container auth files (auth.json / Docker config.json). +def resolve_oci_credentials( + oci_url: str, + username: str | None = None, + password: str | None = None, +) -> OciCredentials: + """Resolve OCI registry credentials with three-level precedence. + + 1. Explicit ``username``/``password`` arguments (if either is non-empty). + 2. ``OCI_USERNAME``/``OCI_PASSWORD`` environment variables. + 3. Container auth files (auth.json / Docker config.json). Args: oci_url: OCI image reference (e.g. ``oci://quay.io/org/image:tag``). + username: Explicit registry username (takes highest priority). + password: Explicit registry password (takes highest priority). Returns: - Tuple of (username, password), or (None, None) if no credentials - are found from any source. - """ - username = os.environ.get("OCI_USERNAME") - password = os.environ.get("OCI_PASSWORD") + OciCredentials with both fields set, or both None. - if username and password: - logger.info("Using OCI registry credentials from environment variables") - return username, password + Raises: + ValueError: If only one of username/password is provided + (at explicit or env-var level). + """ + # Level 1: Explicit arguments + if username is not None or password is not None: + try: + creds = OciCredentials(username=username, password=password) + except ValidationError: + raise ValueError("OCI authentication requires both username and password") from None + if creds.is_authenticated: + return creds - if username or password: - logger.warning( - "Only one of OCI_USERNAME/OCI_PASSWORD is set; ignoring partial env credentials and checking auth files" - ) + # Level 2: Environment variables + env_username = os.environ.get("OCI_USERNAME") + env_password = os.environ.get("OCI_PASSWORD") + if env_username is not None or env_password is not None: + try: + creds = OciCredentials(username=env_username, password=env_password) + except ValidationError: + logger.warning( + "Only one of OCI_USERNAME/OCI_PASSWORD is set; " + "ignoring partial env credentials and checking auth files" + ) + else: + if creds.is_authenticated: + logger.info("Using OCI registry credentials from environment variables") + return creds + + # Level 3: Auth files return read_auth_file_credentials(oci_url) diff --git a/python/packages/jumpstarter/jumpstarter/common/oci_test.py b/python/packages/jumpstarter/jumpstarter/common/oci_test.py index ece249194..2293fa567 100644 --- a/python/packages/jumpstarter/jumpstarter/common/oci_test.py +++ b/python/packages/jumpstarter/jumpstarter/common/oci_test.py @@ -5,9 +5,14 @@ from unittest.mock import patch import pytest +from pydantic import ValidationError from .oci import ( + OciCredentials, + _get_auth_file_paths, + _get_unqualified_search_registries, _normalize_registry, + _parse_registries_for_url, parse_oci_registry, read_auth_file_credentials, resolve_oci_credentials, @@ -32,8 +37,20 @@ def test_standard_urls(self, oci_url, expected): assert parse_oci_registry(oci_url) == expected def test_bare_image_name_defaults_to_docker_hub(self): - # "ubuntu:latest" has no slash — it's a Docker Hub shorthand - assert parse_oci_registry("oci://ubuntu:latest") == "docker.io" + # "ubuntu:latest" has no slash — defaults to first unqualified-search-registry + with patch( + "jumpstarter.common.oci._get_unqualified_search_registries", + return_value=["docker.io"], + ): + assert parse_oci_registry("oci://ubuntu:latest") == "docker.io" + + def test_bare_image_uses_configured_registry(self): + with patch( + "jumpstarter.common.oci._get_unqualified_search_registries", + return_value=["registry.example.com", "docker.io"], + ): + assert parse_oci_registry("oci://ubuntu:latest") == "registry.example.com" + assert parse_oci_registry("oci://myimage") == "registry.example.com" class TestNormalizeRegistry: @@ -75,9 +92,9 @@ def test_reads_from_docker_config(self, tmp_path): "jumpstarter.common.oci._get_auth_file_paths", return_value=[config_path], ): - username, password = read_auth_file_credentials("oci://quay.io/org/image:latest") - assert username == "myuser" - assert password == "mypass" + result = read_auth_file_credentials("oci://quay.io/org/image:latest") + assert result.username == "myuser" + assert result.password.get_secret_value() == "mypass" def test_reads_from_podman_auth_json(self, tmp_path): auth_path = tmp_path / "auth.json" @@ -87,9 +104,9 @@ def test_reads_from_podman_auth_json(self, tmp_path): "jumpstarter.common.oci._get_auth_file_paths", return_value=[auth_path], ): - username, password = read_auth_file_credentials("oci://ghcr.io/org/repo:v1") - assert username == "ghuser" - assert password == "ghtoken" + result = read_auth_file_credentials("oci://ghcr.io/org/repo:v1") + assert result.username == "ghuser" + assert result.password.get_secret_value() == "ghtoken" def test_handles_docker_hub_url_variants(self, tmp_path): """Docker Hub credentials stored under various key formats should match.""" @@ -102,9 +119,9 @@ def test_handles_docker_hub_url_variants(self, tmp_path): "jumpstarter.common.oci._get_auth_file_paths", return_value=[auth_path], ): - username, password = read_auth_file_credentials("oci://docker.io/library/ubuntu:22.04") - assert username == "dockuser" - assert password == "dockpass" + result = read_auth_file_credentials("oci://docker.io/library/ubuntu:22.04") + assert result.username == "dockuser" + assert result.password.get_secret_value() == "dockpass" def test_returns_none_when_no_match(self, tmp_path): auth_path = tmp_path / "auth.json" @@ -114,18 +131,18 @@ def test_returns_none_when_no_match(self, tmp_path): "jumpstarter.common.oci._get_auth_file_paths", return_value=[auth_path], ): - username, password = read_auth_file_credentials("oci://ghcr.io/org/repo:v1") - assert username is None - assert password is None + result = read_auth_file_credentials("oci://ghcr.io/org/repo:v1") + assert result.username is None + assert result.password is None def test_returns_none_when_no_auth_files_exist(self): with patch( "jumpstarter.common.oci._get_auth_file_paths", return_value=[Path("/nonexistent/path/auth.json")], ): - username, password = read_auth_file_credentials("oci://quay.io/org/image:latest") - assert username is None - assert password is None + result = read_auth_file_credentials("oci://quay.io/org/image:latest") + assert result.username is None + assert result.password is None def test_skips_malformed_json(self, tmp_path): bad_file = tmp_path / "bad.json" @@ -138,9 +155,9 @@ def test_skips_malformed_json(self, tmp_path): "jumpstarter.common.oci._get_auth_file_paths", return_value=[bad_file, good_file], ): - username, password = read_auth_file_credentials("oci://quay.io/org/image:latest") - assert username == "user" - assert password == "pass" + result = read_auth_file_credentials("oci://quay.io/org/image:latest") + assert result.username == "user" + assert result.password.get_secret_value() == "pass" def test_supports_separate_username_password_fields(self, tmp_path): """Some tools write username/password directly instead of base64 auth.""" @@ -151,9 +168,22 @@ def test_supports_separate_username_password_fields(self, tmp_path): "jumpstarter.common.oci._get_auth_file_paths", return_value=[auth_path], ): - username, password = read_auth_file_credentials("oci://quay.io/org/image:latest") - assert username == "altuser" - assert password == "altpass" + result = read_auth_file_credentials("oci://quay.io/org/image:latest") + assert result.username == "altuser" + assert result.password.get_secret_value() == "altpass" + + def test_whitespace_only_separate_fields_skipped(self, tmp_path): + """Whitespace-only password in separate fields should be skipped, not crash.""" + auth_path = tmp_path / "auth.json" + auth_path.write_text(_make_auth_json({"quay.io": {"username": "user", "password": " "}})) + + with patch( + "jumpstarter.common.oci._get_auth_file_paths", + return_value=[auth_path], + ): + result = read_auth_file_credentials("oci://quay.io/org/image:latest") + assert result.username is None + assert result.password is None def test_first_matching_file_wins(self, tmp_path): """When multiple auth files have credentials, the first one wins.""" @@ -166,9 +196,9 @@ def test_first_matching_file_wins(self, tmp_path): "jumpstarter.common.oci._get_auth_file_paths", return_value=[first, second], ): - username, password = read_auth_file_credentials("oci://quay.io/org/image:latest") - assert username == "first_user" - assert password == "first_pass" + result = read_auth_file_credentials("oci://quay.io/org/image:latest") + assert result.username == "first_user" + assert result.password.get_secret_value() == "first_pass" def test_password_with_colon(self, tmp_path): """Passwords containing colons should be handled correctly.""" @@ -179,9 +209,9 @@ def test_password_with_colon(self, tmp_path): "jumpstarter.common.oci._get_auth_file_paths", return_value=[auth_path], ): - username, password = read_auth_file_credentials("oci://quay.io/org/image:latest") - assert username == "user" - assert password == "pass:with:colons" + result = read_auth_file_credentials("oci://quay.io/org/image:latest") + assert result.username == "user" + assert result.password.get_secret_value() == "pass:with:colons" def test_registry_with_port(self, tmp_path): auth_path = tmp_path / "auth.json" @@ -191,9 +221,9 @@ def test_registry_with_port(self, tmp_path): "jumpstarter.common.oci._get_auth_file_paths", return_value=[auth_path], ): - username, password = read_auth_file_credentials("oci://registry.local:5000/myrepo:latest") - assert username == "user" - assert password == "pass" + result = read_auth_file_credentials("oci://registry.local:5000/myrepo:latest") + assert result.username == "user" + assert result.password.get_secret_value() == "pass" def test_empty_auths_section(self, tmp_path): auth_path = tmp_path / "auth.json" @@ -203,9 +233,9 @@ def test_empty_auths_section(self, tmp_path): "jumpstarter.common.oci._get_auth_file_paths", return_value=[auth_path], ): - username, password = read_auth_file_credentials("oci://quay.io/org/image:latest") - assert username is None - assert password is None + result = read_auth_file_credentials("oci://quay.io/org/image:latest") + assert result.username is None + assert result.password is None class TestResolveOciCredentials: @@ -215,9 +245,9 @@ def test_env_vars_take_priority(self, tmp_path): with patch.dict(os.environ, {"OCI_USERNAME": "envuser", "OCI_PASSWORD": "envpass"}): with patch("jumpstarter.common.oci._get_auth_file_paths", return_value=[auth_path]): - username, password = resolve_oci_credentials("oci://quay.io/org/image:latest") - assert username == "envuser" - assert password == "envpass" + result = resolve_oci_credentials("oci://quay.io/org/image:latest") + assert result.username == "envuser" + assert result.password.get_secret_value() == "envpass" def test_falls_back_to_auth_file(self, tmp_path): auth_path = tmp_path / "auth.json" @@ -226,9 +256,9 @@ def test_falls_back_to_auth_file(self, tmp_path): env_clean = {k: v for k, v in os.environ.items() if k not in ("OCI_USERNAME", "OCI_PASSWORD")} with patch.dict(os.environ, env_clean, clear=True): with patch("jumpstarter.common.oci._get_auth_file_paths", return_value=[auth_path]): - username, password = resolve_oci_credentials("oci://quay.io/org/image:latest") - assert username == "fileuser" - assert password == "filepass" + result = resolve_oci_credentials("oci://quay.io/org/image:latest") + assert result.username == "fileuser" + assert result.password.get_secret_value() == "filepass" def test_partial_env_falls_back_to_auth_file(self, tmp_path): """When only one env var is set, fall through to auth file instead of returning partial.""" @@ -240,14 +270,430 @@ def test_partial_env_falls_back_to_auth_file(self, tmp_path): env_partial["OCI_USERNAME"] = "partialuser" with patch.dict(os.environ, env_partial, clear=True): with patch("jumpstarter.common.oci._get_auth_file_paths", return_value=[auth_path]): - username, password = resolve_oci_credentials("oci://quay.io/org/image:latest") - assert username == "fileuser" - assert password == "filepass" + result = resolve_oci_credentials("oci://quay.io/org/image:latest") + assert result.username == "fileuser" + assert result.password.get_secret_value() == "filepass" + + def test_partial_env_password_only_falls_back_to_auth_file(self, tmp_path): + """When only OCI_PASSWORD is set, fall through to auth file instead of returning partial.""" + auth_path = tmp_path / "auth.json" + auth_path.write_text(_make_auth_json({"quay.io": {"auth": _encode_auth("fileuser", "filepass")}})) + + env_partial = {k: v for k, v in os.environ.items() if k != "OCI_USERNAME"} + env_partial["OCI_PASSWORD"] = "partialpass" + with patch.dict(os.environ, env_partial, clear=True): + with patch("jumpstarter.common.oci._get_auth_file_paths", return_value=[auth_path]): + result = resolve_oci_credentials("oci://quay.io/org/image:latest") + assert result.username == "fileuser" + assert result.password.get_secret_value() == "filepass" + + def test_whitespace_env_vars_fall_through_to_auth_file(self, tmp_path): + """Whitespace-only env vars should not be treated as credentials.""" + auth_path = tmp_path / "auth.json" + auth_path.write_text(_make_auth_json({"quay.io": {"auth": _encode_auth("fileuser", "filepass")}})) + + env = {k: v for k, v in os.environ.items() if k not in ("OCI_USERNAME", "OCI_PASSWORD")} + env["OCI_USERNAME"] = " " + env["OCI_PASSWORD"] = " " + with patch.dict(os.environ, env, clear=True): + with patch("jumpstarter.common.oci._get_auth_file_paths", return_value=[auth_path]): + result = resolve_oci_credentials("oci://quay.io/org/image:latest") + assert result.username == "fileuser" + assert result.password.get_secret_value() == "filepass" def test_returns_none_when_no_source(self): env_clean = {k: v for k, v in os.environ.items() if k not in ("OCI_USERNAME", "OCI_PASSWORD")} with patch.dict(os.environ, env_clean, clear=True): with patch("jumpstarter.common.oci._get_auth_file_paths", return_value=[]): - username, password = resolve_oci_credentials("oci://quay.io/org/image:latest") - assert username is None - assert password is None + result = resolve_oci_credentials("oci://quay.io/org/image:latest") + assert result.username is None + assert result.password is None + + +class TestParseOciRegistryDigest: + """Digest references (image@sha256:...) must not corrupt registry parsing.""" + + def test_bare_image_with_digest(self): + with patch( + "jumpstarter.common.oci._get_unqualified_search_registries", + return_value=["docker.io"], + ): + assert parse_oci_registry("oci://ubuntu@sha256:abc123") == "docker.io" + + def test_image_with_path_and_digest(self): + assert parse_oci_registry("oci://quay.io/org/repo@sha256:abc123") == "quay.io" + + def test_registry_port_with_digest(self): + assert parse_oci_registry("oci://registry.local:5000/repo@sha256:abc") == "registry.local:5000" + + +class TestGetAuthFilePaths: + """Verify _get_auth_file_paths reads env vars and produces correct ordering.""" + + def test_default_paths_without_env_vars(self): + env_clean = { + k: v for k, v in os.environ.items() if k not in ("REGISTRY_AUTH_FILE", "XDG_RUNTIME_DIR", "DOCKER_CONFIG") + } + with patch.dict(os.environ, env_clean, clear=True): + paths = _get_auth_file_paths() + path_strs = [str(p) for p in paths] + assert any(".config/containers/auth.json" in p for p in path_strs) + assert any(".docker/config.json" in p for p in path_strs) + + def test_registry_auth_file_takes_priority(self, tmp_path): + custom_path = str(tmp_path / "custom-auth.json") + env = { + k: v for k, v in os.environ.items() if k not in ("REGISTRY_AUTH_FILE", "XDG_RUNTIME_DIR", "DOCKER_CONFIG") + } + env["REGISTRY_AUTH_FILE"] = custom_path + with patch.dict(os.environ, env, clear=True): + paths = _get_auth_file_paths() + assert paths[0] == Path(custom_path) + + def test_xdg_runtime_dir_adds_podman_path(self, tmp_path): + env = { + k: v for k, v in os.environ.items() if k not in ("REGISTRY_AUTH_FILE", "XDG_RUNTIME_DIR", "DOCKER_CONFIG") + } + env["XDG_RUNTIME_DIR"] = str(tmp_path) + with patch.dict(os.environ, env, clear=True): + paths = _get_auth_file_paths() + assert Path(tmp_path / "containers" / "auth.json") in paths + + def test_docker_config_adds_custom_docker_path(self, tmp_path): + docker_dir = str(tmp_path / "mydocker") + env = { + k: v for k, v in os.environ.items() if k not in ("REGISTRY_AUTH_FILE", "XDG_RUNTIME_DIR", "DOCKER_CONFIG") + } + env["DOCKER_CONFIG"] = docker_dir + with patch.dict(os.environ, env, clear=True): + paths = _get_auth_file_paths() + assert Path(docker_dir) / "config.json" in paths + + def test_all_env_vars_set_produces_correct_order(self, tmp_path): + env = { + k: v for k, v in os.environ.items() if k not in ("REGISTRY_AUTH_FILE", "XDG_RUNTIME_DIR", "DOCKER_CONFIG") + } + env["REGISTRY_AUTH_FILE"] = str(tmp_path / "explicit.json") + env["XDG_RUNTIME_DIR"] = str(tmp_path / "xdg") + env["DOCKER_CONFIG"] = str(tmp_path / "dockercfg") + with patch.dict(os.environ, env, clear=True): + paths = _get_auth_file_paths() + path_strs = [str(p) for p in paths] + # REGISTRY_AUTH_FILE first, then XDG, then ~/.config, then DOCKER_CONFIG, then ~/.docker + assert path_strs[0] == str(tmp_path / "explicit.json") + assert "xdg/containers/auth.json" in path_strs[1] + assert ".config/containers/auth.json" in path_strs[2] + assert "dockercfg/config.json" in path_strs[3] + assert ".docker/config.json" in path_strs[4] + + +class TestInvalidBase64Auth: + """Malformed base64 in auth entries should be skipped without crashing.""" + + def test_garbage_base64_skipped(self, tmp_path): + auth_path = tmp_path / "auth.json" + auth_path.write_text(_make_auth_json({"quay.io": {"auth": "not-valid-base64!!!"}})) + + with patch( + "jumpstarter.common.oci._get_auth_file_paths", + return_value=[auth_path], + ): + result = read_auth_file_credentials("oci://quay.io/org/image:latest") + assert result.username is None + assert result.password is None + + def test_garbage_base64_falls_through_to_separate_fields(self, tmp_path): + auth_path = tmp_path / "auth.json" + auth_path.write_text( + _make_auth_json( + {"quay.io": {"auth": "not-valid!!!", "username": "fallback_user", "password": "fallback_pass"}} + ) + ) + + with patch( + "jumpstarter.common.oci._get_auth_file_paths", + return_value=[auth_path], + ): + result = read_auth_file_credentials("oci://quay.io/org/image:latest") + assert result.username == "fallback_user" + assert result.password.get_secret_value() == "fallback_pass" + + def test_empty_username_in_base64_falls_through(self, tmp_path): + """Base64 encoding of ':password' should not return ('', 'password').""" + auth_path = tmp_path / "auth.json" + auth_b64 = base64.b64encode(b":onlypassword").decode() + auth_path.write_text(_make_auth_json({"quay.io": {"auth": auth_b64}})) + + with patch( + "jumpstarter.common.oci._get_auth_file_paths", + return_value=[auth_path], + ): + result = read_auth_file_credentials("oci://quay.io/org/image:latest") + assert result.username is None + assert result.password is None + + def test_empty_password_in_base64_falls_through(self, tmp_path): + """Base64 encoding of 'username:' should not return ('username', '').""" + auth_path = tmp_path / "auth.json" + auth_b64 = base64.b64encode(b"onlyusername:").decode() + auth_path.write_text(_make_auth_json({"quay.io": {"auth": auth_b64}})) + + with patch( + "jumpstarter.common.oci._get_auth_file_paths", + return_value=[auth_path], + ): + result = read_auth_file_credentials("oci://quay.io/org/image:latest") + assert result.username is None + assert result.password is None + + +class TestOciCredentials: + def test_fields(self): + creds = OciCredentials(username="user", password="pass") + assert creds.username == "user" + assert creds.password.get_secret_value() == "pass" + + def test_plain_password(self): + creds = OciCredentials(username="user", password="pass") + assert creds.plain_password == "pass" + assert OciCredentials().plain_password is None + + def test_is_authenticated(self): + assert OciCredentials(username="user", password="pass").is_authenticated + assert not OciCredentials().is_authenticated + assert not OciCredentials(username=None, password=None).is_authenticated + + def test_rejects_asymmetric_at_construction(self): + with pytest.raises(ValueError, match="both username and password"): + OciCredentials(username="user", password=None) + with pytest.raises(ValueError, match="both username and password"): + OciCredentials(username=None, password="pass") + + def test_empty_strings_normalized_to_none(self): + creds = OciCredentials(username="", password="") + assert creds.username is None + assert creds.password is None + assert not creds.is_authenticated + + def test_username_with_empty_password_rejected(self): + with pytest.raises(ValueError, match="both username and password"): + OciCredentials(username="user", password="") + + def test_whitespace_strings_normalized_to_none(self): + creds = OciCredentials(username=" ", password=" ") + assert creds.username is None + assert creds.password is None + assert not creds.is_authenticated + + def test_strips_whitespace_from_credentials(self): + creds = OciCredentials(username=" user ", password=" pass ") + assert creds.username == "user" + assert creds.password.get_secret_value() == "pass" + + def test_frozen(self): + creds = OciCredentials(username="user", password="pass") + with pytest.raises(ValidationError): + creds.username = "other" + + def test_resolve_returns_oci_credentials_type(self, tmp_path): + auth_path = tmp_path / "auth.json" + auth_path.write_text(_make_auth_json({"quay.io": {"auth": _encode_auth("user", "pass")}})) + + env_clean = {k: v for k, v in os.environ.items() if k not in ("OCI_USERNAME", "OCI_PASSWORD")} + with patch.dict(os.environ, env_clean, clear=True): + with patch("jumpstarter.common.oci._get_auth_file_paths", return_value=[auth_path]): + result = resolve_oci_credentials("oci://quay.io/org/image:latest") + assert isinstance(result, OciCredentials) + assert result.is_authenticated + + +class TestUnqualifiedSearchRegistries: + """Verify registries.conf reading for bare image resolution.""" + + @pytest.fixture(autouse=True) + def _clear_cache(self): + _get_unqualified_search_registries.cache_clear() + yield + _get_unqualified_search_registries.cache_clear() + + def test_falls_back_to_docker_io_when_no_config(self, tmp_path): + with patch( + "jumpstarter.common.oci._get_registries_conf_paths", + return_value=[tmp_path / "nonexistent.conf"], + ): + result = _get_unqualified_search_registries() + assert result == ("docker.io",) + + def test_reads_from_registries_conf(self, tmp_path): + conf = tmp_path / "registries.conf" + conf.write_text('unqualified-search-registries = ["quay.io", "docker.io"]\n') + + with patch( + "jumpstarter.common.oci._get_registries_conf_paths", + return_value=[conf], + ): + result = _get_unqualified_search_registries() + assert result == ("quay.io", "docker.io") + + def test_skips_malformed_toml(self, tmp_path): + bad = tmp_path / "bad.conf" + bad.write_text("this is not valid toml [[[") + + good = tmp_path / "good.conf" + good.write_text('unqualified-search-registries = ["registry.example.com"]\n') + + with patch( + "jumpstarter.common.oci._get_registries_conf_paths", + return_value=[bad, good], + ): + result = _get_unqualified_search_registries() + assert result == ("registry.example.com",) + + def test_skips_config_without_key(self, tmp_path): + conf = tmp_path / "registries.conf" + conf.write_text('[registries.search]\nregistries = ["old-format"]\n') + + with patch( + "jumpstarter.common.oci._get_registries_conf_paths", + return_value=[conf], + ): + result = _get_unqualified_search_registries() + assert result == ("docker.io",) + + +class TestParseRegistriesForUrl: + """Verify _parse_registries_for_url returns correct registry lists.""" + + def test_explicit_registry_returns_single(self): + assert _parse_registries_for_url("oci://quay.io/org/image:tag") == ("quay.io",) + assert _parse_registries_for_url("oci://ghcr.io/user/repo:v1") == ("ghcr.io",) + + def test_registry_with_port_returns_single(self): + assert _parse_registries_for_url("oci://registry.local:5000/repo:tag") == ("registry.local:5000",) + + def test_bare_image_returns_all_configured(self): + with patch( + "jumpstarter.common.oci._get_unqualified_search_registries", + return_value=("quay.io", "docker.io"), + ): + result = _parse_registries_for_url("oci://ubuntu:latest") + assert result == ("quay.io", "docker.io") + + def test_bare_image_no_tag_returns_all_configured(self): + with patch( + "jumpstarter.common.oci._get_unqualified_search_registries", + return_value=("registry.example.com",), + ): + result = _parse_registries_for_url("oci://myimage") + assert result == ("registry.example.com",) + + def test_namespace_image_returns_all_configured(self): + with patch( + "jumpstarter.common.oci._get_unqualified_search_registries", + return_value=("quay.io", "docker.io"), + ): + result = _parse_registries_for_url("oci://library/ubuntu") + assert result == ("quay.io", "docker.io") + + def test_localhost_returns_single(self): + assert _parse_registries_for_url("localhost/myrepo:tag") == ("localhost",) + + +class TestBareImageCredentialLookup: + """Verify credential lookup tries all configured registries for bare images.""" + + def test_finds_credentials_from_secondary_registry(self, tmp_path): + auth_path = tmp_path / "auth.json" + auth_path.write_text(_make_auth_json({"docker.io": {"auth": _encode_auth("dockuser", "dockpass")}})) + + with ( + patch( + "jumpstarter.common.oci._get_unqualified_search_registries", + return_value=["quay.io", "docker.io"], + ), + patch( + "jumpstarter.common.oci._get_auth_file_paths", + return_value=[auth_path], + ), + ): + result = read_auth_file_credentials("oci://ubuntu:latest") + assert result.username == "dockuser" + assert result.password.get_secret_value() == "dockpass" + + def test_first_matching_registry_wins(self, tmp_path): + auth_path = tmp_path / "auth.json" + auth_path.write_text( + _make_auth_json( + { + "quay.io": {"auth": _encode_auth("quayuser", "quaypass")}, + "docker.io": {"auth": _encode_auth("dockuser", "dockpass")}, + } + ) + ) + + with ( + patch( + "jumpstarter.common.oci._get_unqualified_search_registries", + return_value=["quay.io", "docker.io"], + ), + patch( + "jumpstarter.common.oci._get_auth_file_paths", + return_value=[auth_path], + ), + ): + result = read_auth_file_credentials("oci://ubuntu:latest") + assert result.username == "quayuser" + assert result.password.get_secret_value() == "quaypass" + + def test_no_match_in_any_registry(self, tmp_path): + auth_path = tmp_path / "auth.json" + auth_path.write_text(_make_auth_json({"ghcr.io": {"auth": _encode_auth("user", "pass")}})) + + with ( + patch( + "jumpstarter.common.oci._get_unqualified_search_registries", + return_value=["quay.io", "docker.io"], + ), + patch( + "jumpstarter.common.oci._get_auth_file_paths", + return_value=[auth_path], + ), + ): + result = read_auth_file_credentials("oci://ubuntu:latest") + assert result.username is None + assert result.password is None + + +class TestResolveOciCredentialsExplicitArgs: + """Verify resolve_oci_credentials with explicit username/password parameters.""" + + def test_explicit_args_take_highest_priority(self, monkeypatch): + monkeypatch.setenv("OCI_USERNAME", "env-user") + monkeypatch.setenv("OCI_PASSWORD", "env-pass") + + result = resolve_oci_credentials("oci://quay.io/org/image:tag", username="explicit", password="creds") + assert result.username == "explicit" + assert result.plain_password == "creds" + + def test_partial_explicit_args_raises_value_error(self): + with pytest.raises(ValueError, match="both username and password"): + resolve_oci_credentials("oci://quay.io/org/image:tag", username="user", password=None) + + with pytest.raises(ValueError, match="both username and password"): + resolve_oci_credentials("oci://quay.io/org/image:tag", username=None, password="pass") + + def test_empty_string_args_fall_through_to_env(self, monkeypatch): + monkeypatch.setenv("OCI_USERNAME", "env-user") + monkeypatch.setenv("OCI_PASSWORD", "env-pass") + + result = resolve_oci_credentials("oci://quay.io/org/image:tag", username="", password="") + assert result.username == "env-user" + assert result.plain_password == "env-pass" + + def test_none_args_fall_through_to_env(self, monkeypatch): + monkeypatch.setenv("OCI_USERNAME", "env-user") + monkeypatch.setenv("OCI_PASSWORD", "env-pass") + + result = resolve_oci_credentials("oci://quay.io/org/image:tag") + assert result.username == "env-user" + assert result.plain_password == "env-pass"