Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 105 additions & 86 deletions src/apm_cli/deps/download_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@

import requests

from ..core.auth import HostInfo
from ..models.apm_package import DependencyReference
from ..utils.github_host import (
build_ado_api_url,
build_ado_https_clone_url,
build_ado_ssh_url,
build_ado_https_clone_url, # noqa: F401 -- re-exported for tests/back-compat
build_ado_ssh_url, # noqa: F401 -- re-exported for tests/back-compat
build_artifactory_archive_url,
build_https_clone_url,
build_raw_content_url,
build_ssh_url,
default_host,
is_azure_devops_hostname,
is_github_hostname,
)
from .host_backends import backend_for

# ---------------------------------------------------------------------------
# Module-level debug helper (mirrors the one in github_downloader so that
Expand Down Expand Up @@ -201,84 +202,77 @@ def build_repo_url(
Returns:
str: Repository URL suitable for git clone operations
"""
# Use dep_ref.host if available (for ADO), otherwise fall back to
# instance or default
# Resolve host (used for token-routing and as a fallback when
# ``dep_ref`` is missing for legacy callers).
if dep_ref and dep_ref.host:
host = dep_ref.host
else:
host = getattr(self._host, "github_host", None) or default_host()

# Check if this is Azure DevOps (either via dep_ref or host detection)
is_ado = (dep_ref and dep_ref.is_azure_devops()) or is_azure_devops_hostname(host)
# Pick the vendor-specific backend via ``classify_host`` -- this
# replaces the in-line ``if is_ado / elif is_github / else`` ladder
# with a single dispatch.
backend = backend_for(
dep_ref,
self._host.auth_resolver,
fallback_host=host,
)

is_ado = backend.kind == "ado"
is_insecure = bool(getattr(dep_ref, "is_insecure", False)) if dep_ref is not None else False

# Use provided token or fall back to instance default. Pass an empty
# string ("") explicitly to suppress the per-instance token (used by
# the TransportSelector for "plain HTTPS" / "SSH" attempts that must
# NOT embed credentials in the URL).
# Resolve the effective token. ``token == ""`` is the explicit
# "suppress per-instance default" signal used by the
# TransportSelector for plain-HTTPS / SSH attempts.
if token == "":
github_token = ""
ado_token = ""
effective_token: str | None = ""
elif token is not None:
effective_token = token
elif is_ado:
effective_token = self._host.ado_token
elif backend.is_github_family:
effective_token = self._host.github_token
else:
github_token = token if token is not None else self._host.github_token
ado_token = token if (token is not None and is_ado) else self._host.ado_token
# Generic hosts: backend never embeds tokens; pick None so the
# branch below produces the expected "no credential in URL" form.
effective_token = None

_debug(
f"build_repo_url: host={host}, is_ado={is_ado}, "
f"build_repo_url: host={host}, kind={backend.kind}, "
f"dep_ref={'present' if dep_ref else 'None'}, "
f"ado_org={dep_ref.ado_organization if dep_ref else None}"
)

if is_ado and dep_ref and dep_ref.ado_organization:
# Use Azure DevOps URL builders with ADO-specific token
if use_ssh:
return build_ado_ssh_url(
dep_ref.ado_organization, dep_ref.ado_project, dep_ref.ado_repo
)
elif auth_scheme == "bearer":
# Bearer tokens are injected via GIT_CONFIG env vars
# (Authorization header), NOT embedded in the clone URL.
return build_ado_https_clone_url(
dep_ref.ado_organization,
dep_ref.ado_project,
dep_ref.ado_repo,
token=None,
host=host,
)
elif ado_token:
return build_ado_https_clone_url(
dep_ref.ado_organization,
dep_ref.ado_project,
dep_ref.ado_repo,
token=ado_token,
host=host,
)
else:
return build_ado_https_clone_url(
dep_ref.ado_organization,
dep_ref.ado_project,
dep_ref.ado_repo,
host=host,
)
else:
# Determine if this host should receive a GitHub token
is_github = is_github_hostname(host)
# Thread the user-declared custom port (e.g. 7999 for Bitbucket DC)
# through the URL builders so neither SSH nor HTTPS attempts
# silently drop it.
port = dep_ref.port if dep_ref else None
# ADO without a parsed ``ado_organization`` cannot use the ADO
# builders (they need org/project/repo). Fall through to the
# generic GitHub-style URL the way the previous ladder did.
if is_ado and not (dep_ref and dep_ref.ado_organization):
backend = backend_for(
None,
self._host.auth_resolver,
fallback_host=host,
)

if dep_ref is None:
# Legacy no-dep_ref callers: preserve historical behaviour.
# Build URL directly from ``repo_ref`` + ``host`` since the
# backends require a dep_ref to read host/port/etc.
port = None
if use_ssh:
return build_ssh_url(host, repo_ref, port=port)
elif is_insecure:
netloc = f"{host}:{port}" if port else host
return f"http://{netloc}/{repo_ref}.git"
elif is_github and github_token:
# Only send GitHub tokens to GitHub hosts
return build_https_clone_url(host, repo_ref, token=github_token, port=port)
else:
# Generic hosts: plain HTTPS, let git credential helpers
# handle auth
return build_https_clone_url(host, repo_ref, token=None, port=port)
if is_insecure:
return f"http://{host}/{repo_ref}.git"
if backend.is_github_family and effective_token:
return build_https_clone_url(host, repo_ref, token=effective_token, port=port)
return build_https_clone_url(host, repo_ref, token=None, port=port)

if use_ssh:
return backend.build_clone_ssh_url(dep_ref)
if is_insecure:
return backend.build_clone_http_url(dep_ref)
return backend.build_clone_https_url(
dep_ref, token=effective_token, auth_scheme=auth_scheme
)

# ------------------------------------------------------------------
# Artifactory helpers
Expand Down Expand Up @@ -874,31 +868,56 @@ def _build_contents_api_urls(
) -> list[str]:
"""Return the ordered list of Contents-API URL candidates for *host*.

- github.com -> single api.github.com candidate
- *.ghe.com (GHE Cloud / GHE Data Residency) or GITHUB_HOST-declared
GHES -> single api.<host> candidate (skips Gitea v1 round-trip)
- generic host -> Gitea-native /api/v1/ then Gogs-compat /api/v3/

GitLab uses /api/v4/projects/:id/repository/files/... which has a
different shape; it is intentionally NOT included. GitLab support
is limited to git-clone operations.

``is_github_host`` lets the caller pass its already-computed
classification (which honours ``GITHUB_HOST``); when omitted we
fall back to ``is_github_hostname`` plus the GHES env-var check.
Thin wrapper around the per-host backends — the actual URL shape
lives on the backend. Kept as a static method on
:class:`DownloadDelegate` for back-compat with existing callers
and tests that monkey-patch it.
Comment thread
danielmeppiel marked this conversation as resolved.
Outdated
"""
from .host_backends import GenericGitBackend, GHECloudBackend, GHESBackend, GitHubBackend

if is_github_host is None:
is_github_host = is_github_hostname(host) or DownloadDelegate._is_configured_ghes(host)
if is_github_host:
if host.lower() == "github.com":
return [
f"https://api.github.com/repos/{owner}/{repo}/contents/{file_path}?ref={ref}"
]
return [f"https://api.{host}/repos/{owner}/{repo}/contents/{file_path}?ref={ref}"]
return [
f"https://{host}/api/v1/repos/{owner}/{repo}/contents/{file_path}?ref={ref}",
f"https://{host}/api/v3/repos/{owner}/{repo}/contents/{file_path}?ref={ref}",
]

host_lower = (host or "").lower()
if not is_github_host:
backend = GenericGitBackend(
host_info=HostInfo(
host=host,
kind="generic",
has_public_repos=False,
api_base=f"https://{host}",
)
)
elif host_lower == "github.com":
backend = GitHubBackend(
host_info=HostInfo(
host=host,
kind="github",
has_public_repos=True,
api_base="https://api.github.com",
)
)
elif host_lower.endswith(".ghe.com"):
backend = GHECloudBackend(
host_info=HostInfo(
host=host,
kind="ghe_cloud",
has_public_repos=False,
api_base=f"https://api.{host}",
Comment thread
danielmeppiel marked this conversation as resolved.
Outdated
)
)
else:
# Configured GHES (GITHUB_HOST=<custom-host>): api_base is
# ``https://{host}/api/v3``, not ``https://api.{host}``.
backend = GHESBackend(
host_info=HostInfo(
host=host,
kind="ghes",
has_public_repos=False,
api_base=f"https://{host}/api/v3",
)
)
return backend.build_contents_api_urls(owner, repo, file_path, ref)

@staticmethod
def _build_generic_host_auth_headers(
Expand Down
67 changes: 30 additions & 37 deletions src/apm_cli/deps/github_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1217,17 +1217,23 @@ def _resolve_commit_sha_for_ref(self, dep_ref: DependencyReference, ref: str) ->
return ref.lower()

try:
owner, repo = dep_ref.repo_url.split("/", 1)
except ValueError:
dep_ref.repo_url.split("/", 1)
except (AttributeError, ValueError):
return None

# Build commits API URL -- mirrors the Contents API host shape.
if host == "github.com":
api_url = f"https://api.github.com/repos/{owner}/{repo}/commits/{ref}"
elif host.lower().endswith(".ghe.com"):
api_url = f"https://api.{host}/repos/{owner}/{repo}/commits/{ref}"
else:
api_url = f"https://{host}/api/v3/repos/{owner}/{repo}/commits/{ref}"
# Build commits API URL via the per-host backend; this delegates
# the GitHub-family vs GHE-cloud vs GHES URL-shape decision to
# ``HostBackend.build_commits_api_url`` and gives us a None signal
# for already-resolved 40-char SHAs and for hosts without a cheap
# commit-resolve endpoint.
from .host_backends import backend_for

backend = backend_for(dep_ref, self.auth_resolver, fallback_host=host)
api_url = backend.build_commits_api_url(dep_ref, ref)
if api_url is None:
# Either the backend has no commit-resolve endpoint, the ref is
# already a 40-char SHA, or repo_url could not be split.
return None

# Resolve auth using the same path the file download uses.
org = None
Expand Down Expand Up @@ -1912,20 +1918,14 @@ def _shared_bare_clone_fn(bare_target: Path) -> None:

# For plugins without an explicit version, stamp with the short commit SHA.
package = validation_result.package
if (
validation_result.package_type == PackageType.MARKETPLACE_PLUGIN
and package.version == "0.0.0"
and resolved_commit != "unknown"
):
short_sha = resolved_commit[:7]
package.version = short_sha
apm_yml_path = target_path / "apm.yml"
if apm_yml_path.exists():
from ..utils.yaml_io import dump_yaml, load_yaml
from .package_validator import stamp_plugin_version

_data = load_yaml(apm_yml_path) or {}
_data["version"] = short_sha
dump_yaml(_data, apm_yml_path)
stamp_plugin_version(
package,
validation_result.package_type,
resolved_commit,
target_path,
)

# Update progress - complete
if progress_obj and progress_task_id is not None:
Expand Down Expand Up @@ -2352,21 +2352,14 @@ def download_package(

# For plugins without an explicit version, use the short commit SHA so the
# lock file and conflict detection have a meaningful, stable version string.
if (
validation_result.package_type == PackageType.MARKETPLACE_PLUGIN
and package.version == "0.0.0"
and resolved_ref.resolved_commit
):
short_sha = resolved_ref.resolved_commit[:7]
package.version = short_sha
# Keep the synthesized apm.yml in sync
apm_yml_path = target_path / "apm.yml"
if apm_yml_path.exists():
from ..utils.yaml_io import dump_yaml, load_yaml

_data = load_yaml(apm_yml_path) or {}
_data["version"] = short_sha
dump_yaml(_data, apm_yml_path)
from .package_validator import stamp_plugin_version

stamp_plugin_version(
package,
validation_result.package_type,
resolved_ref.resolved_commit,
target_path,
)

# Create and return PackageInfo
return PackageInfo(
Expand Down
Loading
Loading