diff --git a/HISTORY.md b/HISTORY.md index 02aaffe91b..0efcc82784 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,7 +1,7 @@ Release History =============== -3.0.0a0 (2023-??-??) +3.0.0b0 (2023-09-??) ------------------- **Removed** @@ -41,6 +41,10 @@ Release History **Added** - Static type annotations thorough the whole package. +- `cert` argument for client authentication with certificate can now pass the password/passphrase using a 3-values tuple (cert, key, password). + The three parameters in the tuple must be of type `str`. +- `verify` argument behavior has been extended and now accept your CA bundle as `str` instead of a path. It also accepts your CA bundle as `bytes` directly. + This help when you do not have access to the fs. **Fixed** - An invalid content-type definition would cause the charset being evaluated to `True`, thus making the program crash. diff --git a/src/niquests/_typing.py b/src/niquests/_typing.py index ff7d61b063..80be2262ff 100644 --- a/src/niquests/_typing.py +++ b/src/niquests/_typing.py @@ -43,10 +43,12 @@ typing.MutableMapping[str, str], CookieJar, ] -#: Either Yes/No, or CA bundle pem location. -TLSVerifyType: typing.TypeAlias = typing.Union[bool, str] -#: Accept a pem certificate (concat cert, key) or an explicit tuple of cert, key pair. -TLSClientCertType: typing.TypeAlias = typing.Union[str, typing.Tuple[str, str]] +#: Either Yes/No, or CA bundle pem location. Or directly the raw bundle content itself. +TLSVerifyType: typing.TypeAlias = typing.Union[bool, str, bytes] +#: Accept a pem certificate (concat cert, key) or an explicit tuple of cert, key pair with an optional password. +TLSClientCertType: typing.TypeAlias = typing.Union[ + str, typing.Tuple[str, str], typing.Tuple[str, str, str] +] #: All accepted ways to describe desired timeout. TimeoutType: typing.TypeAlias = typing.Union[ int, # TotalTimeout diff --git a/src/niquests/adapters.py b/src/niquests/adapters.py index aad58b08d0..328f7b6d84 100644 --- a/src/niquests/adapters.py +++ b/src/niquests/adapters.py @@ -283,27 +283,38 @@ def cert_verify( :param cert: The SSL certificate to verify. """ if url.lower().startswith("https") and verify: - cert_loc = None + cert_loc: str | None = None + cert_data: bytes | None = None - # Allow self-specified cert location. - if verify is not True: - cert_loc = verify + if isinstance(verify, str) and "-----BEGIN CERTIFICATE-----" in verify: + verify = verify.encode("utf-8") - if not cert_loc: - cert_loc = extract_zipped_paths(DEFAULT_CA_BUNDLE_PATH) + if isinstance(verify, bytes): + cert_data = verify + else: + # Allow self-specified cert location. + if verify is not True: + cert_loc = verify - if not cert_loc or not os.path.exists(cert_loc): - raise OSError( - f"Could not find a suitable TLS CA certificate bundle, " - f"invalid path: {cert_loc}" - ) + if not cert_loc: + cert_loc = extract_zipped_paths(DEFAULT_CA_BUNDLE_PATH) + + if not cert_loc or not os.path.exists(cert_loc): + raise OSError( + f"Could not find a suitable TLS CA certificate bundle, " + f"invalid path: {cert_loc}" + ) conn.cert_reqs = "CERT_REQUIRED" - if not os.path.isdir(cert_loc): - conn.ca_certs = cert_loc - else: - conn.ca_cert_dir = cert_loc + if cert_data: + # todo: update HTTPSConnPool in urllib3.future and add it! + conn.ca_data = cert_data # type: ignore[attr-defined] + elif cert_loc: + if not os.path.isdir(cert_loc): + conn.ca_certs = cert_loc + else: + conn.ca_cert_dir = cert_loc else: conn.cert_reqs = "CERT_NONE" conn.ca_certs = None @@ -313,9 +324,11 @@ def cert_verify( if not isinstance(cert, str): conn.cert_file = cert[0] conn.key_file = cert[1] + conn.key_password = cert[2] if len(cert) == 3 else None # type: ignore[misc] else: conn.cert_file = cert conn.key_file = None + conn.key_password = None if conn.cert_file and not os.path.exists(conn.cert_file): raise OSError( f"Could not find the TLS certificate file, "