Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@
functionality works as expected.
(Jelmer Vernooij, #780)

* Add callback-based authentication support for HTTP and proxy authentication
in ``Urllib3HttpGitClient``. This allows applications to handle
authentication dynamically without intercepting exceptions. Callbacks
receive the authentication scheme information (via WWW-Authenticate or
Proxy-Authenticate headers) and can provide credentials or cancel.
(Jelmer Vernooij, #822)

0.23.1 2025-06-30

* Support ``untracked_files="normal"`` argument to ``porcelain.status``,
Expand Down
154 changes: 150 additions & 4 deletions dulwich/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2325,27 +2325,112 @@ def default_user_agent_string():
return "git/dulwich/{}".format(".".join([str(x) for x in dulwich.__version__]))


class AuthCallbackPoolManager:
"""Pool manager wrapper that handles authentication callbacks."""

def __init__(self, pool_manager, auth_callback=None, proxy_auth_callback=None):
self._pool_manager = pool_manager
self._auth_callback = auth_callback
self._proxy_auth_callback = proxy_auth_callback
self._auth_attempts = {}

def __getattr__(self, name):
# Delegate all other attributes to the wrapped pool manager
return getattr(self._pool_manager, name)

def request(self, method, url, *args, **kwargs):
"""Make HTTP request with authentication callback support."""
max_attempts = 3
attempts = self._auth_attempts.get(url, 0)

while attempts < max_attempts:
response = self._pool_manager.request(method, url, *args, **kwargs)

if response.status == 401 and self._auth_callback:
# HTTP authentication required
www_authenticate = response.headers.get("WWW-Authenticate", "")
attempts += 1
self._auth_attempts[url] = attempts

# Call the authentication callback
credentials = self._auth_callback(url, www_authenticate, attempts)
if credentials:
# Update request with new credentials
import urllib3.util

auth_header = urllib3.util.make_headers(
basic_auth=f"{credentials['username']}:{credentials.get('password', '')}"
)
if "headers" in kwargs:
kwargs["headers"].update(auth_header)
else:
kwargs["headers"] = auth_header
# Retry the request
continue

elif response.status == 407 and self._proxy_auth_callback:
# Proxy authentication required
proxy_authenticate = response.headers.get("Proxy-Authenticate", "")
attempts += 1
self._auth_attempts[url] = attempts

# Call the proxy authentication callback
credentials = self._proxy_auth_callback(
url, proxy_authenticate, attempts
)
if credentials:
# Update request with new proxy credentials
import urllib3.util

proxy_auth_header = urllib3.util.make_headers(
proxy_basic_auth=f"{credentials['username']}:{credentials.get('password', '')}"
)
if "headers" in kwargs:
kwargs["headers"].update(proxy_auth_header)
else:
kwargs["headers"] = proxy_auth_header
# Retry the request
continue

# Clear attempts on success or non-auth failure
if url in self._auth_attempts:
del self._auth_attempts[url]
return response

# Max attempts reached
return response


def default_urllib3_manager(
config,
pool_manager_cls=None,
proxy_manager_cls=None,
base_url=None,
timeout=None,
auth_callback=None,
proxy_auth_callback=None,
**override_kwargs,
) -> Union["urllib3.ProxyManager", "urllib3.PoolManager"]:
) -> Union["urllib3.ProxyManager", "urllib3.PoolManager", "AuthCallbackPoolManager"]:
"""Return urllib3 connection pool manager.

Honour detected proxy configurations.

Args:
config: `dulwich.config.ConfigDict` instance with Git configuration.
timeout: Timeout for HTTP requests in seconds
auth_callback: Callback function for HTTP authentication.
Called with (url, www_authenticate_header, attempt_number).
Should return {'username': str, 'password': str} or None.
proxy_auth_callback: Callback function for proxy authentication.
Called with (url, proxy_authenticate_header, attempt_number).
Should return {'username': str, 'password': str} or None.
override_kwargs: Additional arguments for `urllib3.ProxyManager`

Returns:
Either pool_manager_cls (defaults to `urllib3.ProxyManager`) instance for
proxy configurations, proxy_manager_cls
(defaults to `urllib3.PoolManager`) instance otherwise
(defaults to `urllib3.PoolManager`) instance otherwise.
If auth callbacks are provided, returns AuthCallbackPoolManager wrapper.

"""
proxy_server = user_agent = None
Expand Down Expand Up @@ -2416,12 +2501,37 @@ def default_urllib3_manager(

import urllib3

# Check for proxy authentication method configuration
proxy_auth_method = None
if config is not None:
try:
proxy_auth_method = config.get(b"http", b"proxyAuthMethod")
if proxy_auth_method and isinstance(proxy_auth_method, bytes):
proxy_auth_method = proxy_auth_method.decode().lower()
except KeyError:
pass

# Check environment variable override
env_proxy_auth = os.environ.get("GIT_HTTP_PROXY_AUTHMETHOD")
if env_proxy_auth:
proxy_auth_method = env_proxy_auth.lower()

if proxy_server is not None:
if proxy_manager_cls is None:
proxy_manager_cls = urllib3.ProxyManager
if not isinstance(proxy_server, str):
proxy_server = proxy_server.decode()
proxy_server_url = urlparse(proxy_server)

# Validate proxy auth method if specified
if proxy_auth_method and proxy_auth_method not in ("anyauth", "basic"):
# Only basic and anyauth are currently supported
# Other methods like digest, negotiate, ntlm would require additional libraries
raise NotImplementedError(
f"Proxy authentication method '{proxy_auth_method}' is not supported. "
"Only 'basic' and 'anyauth' are currently supported."
)

if proxy_server_url.username is not None:
proxy_headers = urllib3.make_headers(
proxy_basic_auth=f"{proxy_server_url.username}:{proxy_server_url.password or ''}" # type: ignore
Expand All @@ -2436,6 +2546,10 @@ def default_urllib3_manager(
pool_manager_cls = urllib3.PoolManager
manager = pool_manager_cls(headers=headers, **kwargs)

# Wrap with auth callback support if callbacks are provided
if auth_callback or proxy_auth_callback:
manager = AuthCallbackPoolManager(manager, auth_callback, proxy_auth_callback)

return manager


Expand Down Expand Up @@ -2946,6 +3060,23 @@ def wrapper(*args, **kwargs):


class Urllib3HttpGitClient(AbstractHttpGitClient):
"""HTTP Git client using urllib3.

Supports callback-based authentication for both HTTP and proxy authentication,
allowing dynamic credential handling without intercepting exceptions.

Example:
>>> def auth_callback(url, www_authenticate, attempt):
... # Parse www_authenticate header to determine auth scheme
... # Return credentials or None to cancel
... return {"username": "user", "password": "pass"}
>>>
>>> client = Urllib3HttpGitClient(
... "https://github.com/private/repo.git",
... auth_callback=auth_callback
... )
"""

def __init__(
self,
base_url,
Expand All @@ -2955,18 +3086,33 @@ def __init__(
username=None,
password=None,
timeout=None,
auth_callback=None,
proxy_auth_callback=None,
**kwargs,
) -> None:
self._username = username
self._password = password
self._timeout = timeout
self._auth_callback = auth_callback
self._proxy_auth_callback = proxy_auth_callback

if pool_manager is None:
self.pool_manager = default_urllib3_manager(
config, base_url=base_url, timeout=timeout
config,
base_url=base_url,
timeout=timeout,
auth_callback=auth_callback,
proxy_auth_callback=proxy_auth_callback,
)
else:
self.pool_manager = pool_manager
# If a custom pool_manager is provided and callbacks are specified,
# wrap it with AuthCallbackPoolManager
if auth_callback or proxy_auth_callback:
self.pool_manager = AuthCallbackPoolManager(
pool_manager, auth_callback, proxy_auth_callback
)
else:
self.pool_manager = pool_manager

if username is not None:
# No escaping needed: ":" is not allowed in username:
Expand Down
81 changes: 81 additions & 0 deletions examples/auth_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#!/usr/bin/env python3
"""Example of using callback-based authentication with dulwich HTTP client.

This example demonstrates how to use the new callback-based authentication
feature to handle HTTP and proxy authentication dynamically.

Note: Dulwich currently supports 'basic' and 'anyauth' proxy authentication
methods via the http.proxyAuthMethod git config option or the
GIT_HTTP_PROXY_AUTHMETHOD environment variable. Other methods like 'digest',
'negotiate', and 'ntlm' will raise NotImplementedError.
"""

from dulwich.client import HttpGitClient


def my_auth_callback(url, www_authenticate, attempt):
"""Callback function for HTTP authentication.

Args:
url: The URL that requires authentication
www_authenticate: The WWW-Authenticate header value from the server
attempt: The attempt number (starts at 1)

Returns:
dict: Credentials with 'username' and 'password' keys, or None to cancel
"""
print(f"Authentication required for {url}")
print(f"Server says: {www_authenticate}")
print(f"Attempt {attempt} of 3")

# In a real application, you might:
# - Prompt the user for credentials
# - Look up credentials in a password manager
# - Parse the www_authenticate header to determine the auth scheme

if attempt <= 2:
# Example: return hardcoded credentials for demo
return {"username": "myuser", "password": "mypassword"}
else:
# Give up after 2 attempts
return None


def my_proxy_auth_callback(url, proxy_authenticate, attempt):
"""Callback function for proxy authentication.

Args:
url: The URL being accessed through the proxy
proxy_authenticate: The Proxy-Authenticate header value from the proxy
attempt: The attempt number (starts at 1)

Returns:
dict: Credentials with 'username' and 'password' keys, or None to cancel
"""
print(f"Proxy authentication required for accessing {url}")
print(f"Proxy says: {proxy_authenticate}")

# Return proxy credentials
return {"username": "proxyuser", "password": "proxypass"}


def main():
# Create an HTTP Git client with authentication callbacks
client = HttpGitClient(
"https://github.com/private/repo.git",
auth_callback=my_auth_callback,
proxy_auth_callback=my_proxy_auth_callback,
)

# Now when you use the client, it will call your callbacks
# if authentication is required
try:
# Example: fetch refs from the repository
refs = client.fetch_refs("https://github.com/private/repo.git")
print(f"Successfully fetched refs: {refs}")
except Exception as e:
print(f"Error: {e}")


if __name__ == "__main__":
main()
Loading