Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

openpgp: implement WKD advanced lookup #27

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions gemato/openpgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import typing
import urllib.parse
import warnings
import socket
import functools

from pathlib import Path

Expand Down Expand Up @@ -96,14 +98,47 @@ def primary_key_fingerprint(self) -> str:
b'ybndrfg8ejkmcpqxot1uwisza345h769')


def get_wkd_url(email):
def _get_wkd_inputs(email):
localname, domain = email.split('@', 1)
b32 = (
base64.b32encode(
hashlib.sha1(localname.encode('utf8').lower()).digest())
.translate(ZBASE32_TRANSLATE).decode('ASCII'))
return (f'https://{domain.lower()}/.well-known/openpgpkey/hu/'
f'{b32}?l={urllib.parse.quote(localname)}')
return localname, domain, b32


@functools.lru_cache
def has_advanced_wkd(domain):
subdomain = 'openpgpkey.' + domain.lower()
res = socket.getaddrinfo(subdomain, 443,
type=socket.SOCK_STREAM,
proto=socket.IPPROTO_TCP
)
return len(res) > 0


@functools.lru_cache
def get_wkd_url(email):
# https://datatracker.ietf.org/doc/draft-koch-openpgp-webkey-service/15/
# There are two variants on how to form the request URI: The advanced
# and the direct method. Implementations MUST first try the advanced
# method. Only if an address for the required sub-domain does not
# exist, they SHOULD fall back to the direct method.
#
# Translation:
# 1. Try to resolve the hostname for advanced method first.
# 2. If if does NOT resolve, fall back to direct method.
localname, domain, b32 = _get_wkd_inputs(email)
if has_advanced_wkd(domain):
# https://openpgpkey.example.org/.well-known/openpgpkey/example.org/hu/iy9q119eutrkn8s1mk4r39qejnbu3n5q?l=Joe.Doe
return (f'https://openpgpkey.{domain.lower()}/'
f'.well-known/openpgpkey/{domain.lower()}/hu/'
f'{b32}?l={urllib.parse.quote(localname)}')
else:
# https://example.org/.well-known/openpgpkey/hu/iy9q119eutrkn8s1mk4r39qejnbu3n5q?l=Joe.Doe
return (f'https://{domain.lower()}/'
f'.well-known/openpgpkey/hu/'
f'{b32}?l={urllib.parse.quote(localname)}')


class SystemGPGEnvironment:
Expand Down
51 changes: 34 additions & 17 deletions tests/test_openpgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
IsolatedGPGEnvironment,
PGPyEnvironment,
get_wkd_url,
has_advanced_wkd,
OpenPGPSignatureList,
OpenPGPSignatureData,
OpenPGPSignatureStatus,
Expand Down Expand Up @@ -877,8 +878,9 @@ def hkp_server(global_hkp_server):
'VALID_PUBLIC_KEY', None),
])
def test_refresh_hkp(openpgp_env_with_refresh, hkp_server, manifest_var,
key_var, server_key_fpr, server_key_var, expected):
key_var, server_key_fpr, server_key_var, expected, monkeypatch):
"""Test refreshing against a HKP keyserver"""
monkeypatch.setattr(gemato.openpgp, 'has_advanced_wkd', lambda _: True)
try:
if key_var is not None:
with io.BytesIO(globals()[key_var]) as f:
Expand Down Expand Up @@ -925,18 +927,32 @@ def test_refresh_wkd(openpgp_env_with_refresh,
openpgp_env_with_refresh.import_key(f)

if server_key_var is not None:
responses.add(
responses.GET,
'https://example.com/.well-known/openpgpkey/hu/'
'5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
body=globals()[server_key_var],
content_type='application/pgp-keys')
body_ = globals()[server_key_var]
# The content-type changed over the development of the RFC, section 3.1.
# https://datatracker.ietf.org/doc/draft-koch-openpgp-webkey-service/15/
# 00 = The server SHOULD return "application/pgp-key" as the content-type
# 01 = The server SHOULD return "application/octet-string" as the content-type
content_type_ = 'application/octet-stream'
status_ = 200
headers_ = { 'Content-Length': len(body_) } # needed esp. for HEAD
else:
responses.add(
responses.GET,
'https://example.com/.well-known/openpgpkey/hu/'
'5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
status=404)
body_ = None
content_type_ = None
status_ = 404
headers_ = None
url1 = 'https://example.com/.well-known/openpgpkey/hu/' \
'5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato'
url2 = 'https://openpgpkey.example.com/.well-known/openpgpkey/example.org/hu/' \
'5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato'
for method_ in [responses.GET, responses.HEAD]:
for url_ in [url1, url2]:
body__ = body_ if method_ == responses.HEAD else None
responses.add(
method_, url_,
body=body__,
content_type=content_type_,
headers=headers_,
status=status_)

if expected is None:
openpgp_env_with_refresh.refresh_keys(
Expand Down Expand Up @@ -982,15 +998,16 @@ def test_refresh_wkd_fallback_to_hkp(openpgp_env_with_refresh,


@pytest.mark.parametrize(
'email,expected',
[('[email protected]',
'email,has_advanced_wkd,expected',
[('[email protected]', False,
'https://example.com/.well-known/openpgpkey/hu/'
'5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato'),
('[email protected]',
'https://example.org/.well-known/openpgpkey/hu/'
('[email protected]', True,
'https://openpgpkey.example.org/.well-known/openpgpkey/example.org/hu/'
'iy9q119eutrkn8s1mk4r39qejnbu3n5q?l=Joe.Doe'),
])
def test_get_wkd_url(email, expected):
def test_get_wkd_url(email, has_advanced_wkd, expected, monkeypatch):
monkeypatch.setattr(gemato.openpgp, 'has_advanced_wkd', lambda _: has_advanced_wkd)
assert get_wkd_url(email) == expected


Expand Down