From 9412b3b9c02c436a206678657fbbc1a0eeab3bd1 Mon Sep 17 00:00:00 2001 From: "Robin H. Johnson" Date: Sun, 5 Feb 2023 22:32:38 -0800 Subject: [PATCH 1/4] openpgp: implement WKD advanced lookup Signed-off-by: Robin H. Johnson --- gemato/openpgp.py | 41 ++++++++++++++++++++++++++++++++++++++--- tests/test_openpgp.py | 32 +++++++++++++++++++++----------- 2 files changed, 59 insertions(+), 14 deletions(-) diff --git a/gemato/openpgp.py b/gemato/openpgp.py index fb178af..7af5c67 100644 --- a/gemato/openpgp.py +++ b/gemato/openpgp.py @@ -18,6 +18,8 @@ import typing import urllib.parse import warnings +import socket +import functools from pathlib import Path @@ -96,14 +98,47 @@ def primary_key_fingerprint(self) -> str: b'ybndrfg8ejkmcpqxot1uwisza345h769') -def get_wkd_url(email): +def _get_wkd_inputs(email): localname, domain = email.split('@', 1) b32 = ( base64.b32encode( hashlib.sha1(localname.encode('utf8').lower()).digest()) .translate(ZBASE32_TRANSLATE).decode('ASCII')) - return (f'https://{domain.lower()}/.well-known/openpgpkey/hu/' - f'{b32}?l={urllib.parse.quote(localname)}') + return localname, domain, b32 + + +@functools.lru_cache +def has_advanced_wkd(domain): + subdomain = 'openpgpkey.' + domain.lower() + res = socket.getaddrinfo(subdomain, 443, + type=socket.SOCK_STREAM, + proto=socket.IPPROTO_TCP + ) + return len(res) > 0 + + +@functools.lru_cache +def get_wkd_url(email): + # https://datatracker.ietf.org/doc/draft-koch-openpgp-webkey-service/15/ + # There are two variants on how to form the request URI: The advanced + # and the direct method. Implementations MUST first try the advanced + # method. Only if an address for the required sub-domain does not + # exist, they SHOULD fall back to the direct method. + # + # Translation: + # 1. Try to resolve the hostname for advanced method first. + # 2. If if does NOT resolve, fall back to direct method. + localname, domain, b32 = _get_wkd_inputs(email) + if has_advanced_wkd(domain): + # https://openpgpkey.example.org/.well-known/openpgpkey/example.org/hu/iy9q119eutrkn8s1mk4r39qejnbu3n5q?l=Joe.Doe + return (f'https://openpgpkey.{domain.lower()}/' + f'.well-known/openpgpkey/{domain.lower()}/hu/' + f'{b32}?l={urllib.parse.quote(localname)}') + else: + # https://example.org/.well-known/openpgpkey/hu/iy9q119eutrkn8s1mk4r39qejnbu3n5q?l=Joe.Doe + return (f'https://{domain.lower()}/' + f'.well-known/openpgpkey/hu/' + f'{b32}?l={urllib.parse.quote(localname)}') class SystemGPGEnvironment: diff --git a/tests/test_openpgp.py b/tests/test_openpgp.py index 0c43fc6..a0cf917 100644 --- a/tests/test_openpgp.py +++ b/tests/test_openpgp.py @@ -879,6 +879,7 @@ def hkp_server(global_hkp_server): def test_refresh_hkp(openpgp_env_with_refresh, hkp_server, manifest_var, key_var, server_key_fpr, server_key_var, expected): """Test refreshing against a HKP keyserver""" + # TODO: Need to mock socket.getaddrinfo to test this safely try: if key_var is not None: with io.BytesIO(globals()[key_var]) as f: @@ -925,18 +926,27 @@ def test_refresh_wkd(openpgp_env_with_refresh, openpgp_env_with_refresh.import_key(f) if server_key_var is not None: - responses.add( - responses.GET, - 'https://example.com/.well-known/openpgpkey/hu/' - '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', - body=globals()[server_key_var], - content_type='application/pgp-keys') + body_ = globals()[server_key_var] + content_type_ = 'application/pgp-keys' + status_ = 200 else: - responses.add( - responses.GET, - 'https://example.com/.well-known/openpgpkey/hu/' - '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', - status=404) + body_ = None + content_type_ = None + status_ = 404 + responses.add( + responses.GET, + 'https://example.com/.well-known/openpgpkey/hu/' + '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', + body=body_, + content_type=content_type_, + status=status_) + responses.add( + responses.GET, + 'https://openpgpkey.example.com/.well-known/openpgpkey/example.org/hu/' + '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', + body=body_, + content_type=content_type_, + status=status_) if expected is None: openpgp_env_with_refresh.refresh_keys( From 7b762f488e9b028494c81faf12c1eaa4d8d00fe5 Mon Sep 17 00:00:00 2001 From: Alec Warner Date: Tue, 7 Feb 2023 10:49:01 -0800 Subject: [PATCH 2/4] tests/test_openpgp: fix WKD testing Signed-off-by: Robin H. Johnson --- tests/test_openpgp.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/tests/test_openpgp.py b/tests/test_openpgp.py index a0cf917..65b5254 100644 --- a/tests/test_openpgp.py +++ b/tests/test_openpgp.py @@ -35,6 +35,7 @@ IsolatedGPGEnvironment, PGPyEnvironment, get_wkd_url, + has_advanced_wkd, OpenPGPSignatureList, OpenPGPSignatureData, OpenPGPSignatureStatus, @@ -877,9 +878,9 @@ def hkp_server(global_hkp_server): 'VALID_PUBLIC_KEY', None), ]) def test_refresh_hkp(openpgp_env_with_refresh, hkp_server, manifest_var, - key_var, server_key_fpr, server_key_var, expected): + key_var, server_key_fpr, server_key_var, expected, monkeypatch): """Test refreshing against a HKP keyserver""" - # TODO: Need to mock socket.getaddrinfo to test this safely + monkeypatch.setattr(gemato.openpgp, 'has_advanced_wkd', lambda _: True) try: if key_var is not None: with io.BytesIO(globals()[key_var]) as f: @@ -992,15 +993,16 @@ def test_refresh_wkd_fallback_to_hkp(openpgp_env_with_refresh, @pytest.mark.parametrize( - 'email,expected', - [('gemato@example.com', + 'email,has_advanced_wkd,expected', + [('gemato@example.com', False, 'https://example.com/.well-known/openpgpkey/hu/' '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato'), - ('Joe.Doe@Example.ORG', - 'https://example.org/.well-known/openpgpkey/hu/' + ('Joe.Doe@Example.ORG', True, + 'https://openpgpkey.example.org/.well-known/openpgpkey/example.org/hu/' 'iy9q119eutrkn8s1mk4r39qejnbu3n5q?l=Joe.Doe'), ]) -def test_get_wkd_url(email, expected): +def test_get_wkd_url(email, has_advanced_wkd, expected, monkeypatch): + monkeypatch.setattr(gemato.openpgp, 'has_advanced_wkd', lambda _: has_advanced_wkd) assert get_wkd_url(email) == expected From c4cda2e84a7375cddfe3c0bb911b0c620d828b3f Mon Sep 17 00:00:00 2001 From: "Robin H. Johnson" Date: Fri, 21 Apr 2023 14:09:58 -0700 Subject: [PATCH 3/4] tests/test_openpgp: fix content type that changed during RFC development Signed-off-by: Robin H. Johnson --- tests/test_openpgp.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/test_openpgp.py b/tests/test_openpgp.py index 65b5254..59e7941 100644 --- a/tests/test_openpgp.py +++ b/tests/test_openpgp.py @@ -928,7 +928,11 @@ def test_refresh_wkd(openpgp_env_with_refresh, if server_key_var is not None: body_ = globals()[server_key_var] - content_type_ = 'application/pgp-keys' + # The content-type changed over the development of the RFC, section 3.1. + # https://datatracker.ietf.org/doc/draft-koch-openpgp-webkey-service/15/ + # 00 = The server SHOULD return "application/pgp-key" as the content-type + # 01 = The server SHOULD return "application/octet-string" as the content-type + content_type_ = 'application/octet-stream' status_ = 200 else: body_ = None From c6a2eb88074e057a437d5f616fe65d213f911fcb Mon Sep 17 00:00:00 2001 From: "Robin H. Johnson" Date: Fri, 21 Apr 2023 14:16:55 -0700 Subject: [PATCH 4/4] tests/test_openpgp: ensure HEAD & Content-Length work in tests Signed-off-by: Robin H. Johnson --- tests/test_openpgp.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/tests/test_openpgp.py b/tests/test_openpgp.py index 59e7941..12f59f7 100644 --- a/tests/test_openpgp.py +++ b/tests/test_openpgp.py @@ -934,24 +934,25 @@ def test_refresh_wkd(openpgp_env_with_refresh, # 01 = The server SHOULD return "application/octet-string" as the content-type content_type_ = 'application/octet-stream' status_ = 200 + headers_ = { 'Content-Length': len(body_) } # needed esp. for HEAD else: body_ = None content_type_ = None status_ = 404 - responses.add( - responses.GET, - 'https://example.com/.well-known/openpgpkey/hu/' - '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', - body=body_, - content_type=content_type_, - status=status_) - responses.add( - responses.GET, - 'https://openpgpkey.example.com/.well-known/openpgpkey/example.org/hu/' - '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', - body=body_, - content_type=content_type_, - status=status_) + headers_ = None + url1 = 'https://example.com/.well-known/openpgpkey/hu/' \ + '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato' + url2 = 'https://openpgpkey.example.com/.well-known/openpgpkey/example.org/hu/' \ + '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato' + for method_ in [responses.GET, responses.HEAD]: + for url_ in [url1, url2]: + body__ = body_ if method_ == responses.HEAD else None + responses.add( + method_, url_, + body=body__, + content_type=content_type_, + headers=headers_, + status=status_) if expected is None: openpgp_env_with_refresh.refresh_keys(