diff --git a/gemato/openpgp.py b/gemato/openpgp.py index fb178af..7af5c67 100644 --- a/gemato/openpgp.py +++ b/gemato/openpgp.py @@ -18,6 +18,8 @@ import typing import urllib.parse import warnings +import socket +import functools from pathlib import Path @@ -96,14 +98,47 @@ def primary_key_fingerprint(self) -> str: b'ybndrfg8ejkmcpqxot1uwisza345h769') -def get_wkd_url(email): +def _get_wkd_inputs(email): localname, domain = email.split('@', 1) b32 = ( base64.b32encode( hashlib.sha1(localname.encode('utf8').lower()).digest()) .translate(ZBASE32_TRANSLATE).decode('ASCII')) - return (f'https://{domain.lower()}/.well-known/openpgpkey/hu/' - f'{b32}?l={urllib.parse.quote(localname)}') + return localname, domain, b32 + + +@functools.lru_cache +def has_advanced_wkd(domain): + subdomain = 'openpgpkey.' + domain.lower() + res = socket.getaddrinfo(subdomain, 443, + type=socket.SOCK_STREAM, + proto=socket.IPPROTO_TCP + ) + return len(res) > 0 + + +@functools.lru_cache +def get_wkd_url(email): + # https://datatracker.ietf.org/doc/draft-koch-openpgp-webkey-service/15/ + # There are two variants on how to form the request URI: The advanced + # and the direct method. Implementations MUST first try the advanced + # method. Only if an address for the required sub-domain does not + # exist, they SHOULD fall back to the direct method. + # + # Translation: + # 1. Try to resolve the hostname for advanced method first. + # 2. If if does NOT resolve, fall back to direct method. + localname, domain, b32 = _get_wkd_inputs(email) + if has_advanced_wkd(domain): + # https://openpgpkey.example.org/.well-known/openpgpkey/example.org/hu/iy9q119eutrkn8s1mk4r39qejnbu3n5q?l=Joe.Doe + return (f'https://openpgpkey.{domain.lower()}/' + f'.well-known/openpgpkey/{domain.lower()}/hu/' + f'{b32}?l={urllib.parse.quote(localname)}') + else: + # https://example.org/.well-known/openpgpkey/hu/iy9q119eutrkn8s1mk4r39qejnbu3n5q?l=Joe.Doe + return (f'https://{domain.lower()}/' + f'.well-known/openpgpkey/hu/' + f'{b32}?l={urllib.parse.quote(localname)}') class SystemGPGEnvironment: diff --git a/tests/test_openpgp.py b/tests/test_openpgp.py index 0c43fc6..12f59f7 100644 --- a/tests/test_openpgp.py +++ b/tests/test_openpgp.py @@ -35,6 +35,7 @@ IsolatedGPGEnvironment, PGPyEnvironment, get_wkd_url, + has_advanced_wkd, OpenPGPSignatureList, OpenPGPSignatureData, OpenPGPSignatureStatus, @@ -877,8 +878,9 @@ def hkp_server(global_hkp_server): 'VALID_PUBLIC_KEY', None), ]) def test_refresh_hkp(openpgp_env_with_refresh, hkp_server, manifest_var, - key_var, server_key_fpr, server_key_var, expected): + key_var, server_key_fpr, server_key_var, expected, monkeypatch): """Test refreshing against a HKP keyserver""" + monkeypatch.setattr(gemato.openpgp, 'has_advanced_wkd', lambda _: True) try: if key_var is not None: with io.BytesIO(globals()[key_var]) as f: @@ -925,18 +927,32 @@ def test_refresh_wkd(openpgp_env_with_refresh, openpgp_env_with_refresh.import_key(f) if server_key_var is not None: - responses.add( - responses.GET, - 'https://example.com/.well-known/openpgpkey/hu/' - '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', - body=globals()[server_key_var], - content_type='application/pgp-keys') + body_ = globals()[server_key_var] + # The content-type changed over the development of the RFC, section 3.1. + # https://datatracker.ietf.org/doc/draft-koch-openpgp-webkey-service/15/ + # 00 = The server SHOULD return "application/pgp-key" as the content-type + # 01 = The server SHOULD return "application/octet-string" as the content-type + content_type_ = 'application/octet-stream' + status_ = 200 + headers_ = { 'Content-Length': len(body_) } # needed esp. for HEAD else: - responses.add( - responses.GET, - 'https://example.com/.well-known/openpgpkey/hu/' - '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', - status=404) + body_ = None + content_type_ = None + status_ = 404 + headers_ = None + url1 = 'https://example.com/.well-known/openpgpkey/hu/' \ + '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato' + url2 = 'https://openpgpkey.example.com/.well-known/openpgpkey/example.org/hu/' \ + '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato' + for method_ in [responses.GET, responses.HEAD]: + for url_ in [url1, url2]: + body__ = body_ if method_ == responses.HEAD else None + responses.add( + method_, url_, + body=body__, + content_type=content_type_, + headers=headers_, + status=status_) if expected is None: openpgp_env_with_refresh.refresh_keys( @@ -982,15 +998,16 @@ def test_refresh_wkd_fallback_to_hkp(openpgp_env_with_refresh, @pytest.mark.parametrize( - 'email,expected', - [('gemato@example.com', + 'email,has_advanced_wkd,expected', + [('gemato@example.com', False, 'https://example.com/.well-known/openpgpkey/hu/' '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato'), - ('Joe.Doe@Example.ORG', - 'https://example.org/.well-known/openpgpkey/hu/' + ('Joe.Doe@Example.ORG', True, + 'https://openpgpkey.example.org/.well-known/openpgpkey/example.org/hu/' 'iy9q119eutrkn8s1mk4r39qejnbu3n5q?l=Joe.Doe'), ]) -def test_get_wkd_url(email, expected): +def test_get_wkd_url(email, has_advanced_wkd, expected, monkeypatch): + monkeypatch.setattr(gemato.openpgp, 'has_advanced_wkd', lambda _: has_advanced_wkd) assert get_wkd_url(email) == expected