Skip to content

Commit

Permalink
openpgp: implement WKD advanced lookup
Browse files Browse the repository at this point in the history
Signed-off-by: Robin H. Johnson <[email protected]>
  • Loading branch information
robbat2 committed Feb 6, 2023
1 parent 5e683c7 commit 6b8ff71
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 14 deletions.
41 changes: 38 additions & 3 deletions gemato/openpgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import typing
import urllib.parse
import warnings
import socket
import functools

from pathlib import Path

Expand Down Expand Up @@ -95,14 +97,47 @@ def primary_key_fingerprint(self) -> str:
b'ybndrfg8ejkmcpqxot1uwisza345h769')


def get_wkd_url(email):
def _get_wkd_inputs(email):
localname, domain = email.split('@', 1)
b32 = (
base64.b32encode(
hashlib.sha1(localname.encode('utf8').lower()).digest())
.translate(ZBASE32_TRANSLATE).decode('ASCII'))
return (f'https://{domain.lower()}/.well-known/openpgpkey/hu/'
f'{b32}?l={urllib.parse.quote(localname)}')
return localname, domain, b32


@functools.lru_cache
def has_advanced_wkd(domain):
subdomain = 'openpgpkey.' + domain.lower()
res = socket.getaddrinfo(subdomain, 443,
type=socket.SOCK_STREAM,
proto=socket.IPPROTO_TCP
)
return len(res) > 0


@functools.lru_cache
def get_wkd_url(email):
# https://datatracker.ietf.org/doc/draft-koch-openpgp-webkey-service/15/
# There are two variants on how to form the request URI: The advanced
# and the direct method. Implementations MUST first try the advanced
# method. Only if an address for the required sub-domain does not
# exist, they SHOULD fall back to the direct method.
#
# Translation:
# 1. Try to resolve the hostname for advanced method first.
# 2. If if does NOT resolve, fall back to direct method.
localname, domain, b32 = _get_wkd_inputs(email)
if has_advanced_wkd(domain):
# https://openpgpkey.example.org/.well-known/openpgpkey/example.org/hu/iy9q119eutrkn8s1mk4r39qejnbu3n5q?l=Joe.Doe
return (f'https://openpgpkey.{domain.lower()}/'
f'.well-known/openpgpkey/{domain.lower()}/hu/'
f'{b32}?l={urllib.parse.quote(localname)}')
else:
# https://example.org/.well-known/openpgpkey/hu/iy9q119eutrkn8s1mk4r39qejnbu3n5q?l=Joe.Doe
return (f'https://{domain.lower()}/'
f'.well-known/openpgpkey/hu/'
f'{b32}?l={urllib.parse.quote(localname)}')


class SystemGPGEnvironment:
Expand Down
32 changes: 21 additions & 11 deletions tests/test_openpgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,7 @@ def hkp_server(global_hkp_server):
def test_refresh_hkp(openpgp_env_with_refresh, hkp_server, manifest_var,
key_var, server_key_fpr, server_key_var, expected):
"""Test refreshing against a HKP keyserver"""
# TODO: Need to mock socket.getaddrinfo to test this safely
try:
if key_var is not None:
with io.BytesIO(globals()[key_var]) as f:
Expand Down Expand Up @@ -902,18 +903,27 @@ def test_refresh_wkd(openpgp_env_with_refresh,
openpgp_env_with_refresh.import_key(f)

if server_key_var is not None:
responses.add(
responses.GET,
'https://example.com/.well-known/openpgpkey/hu/'
'5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
body=globals()[server_key_var],
content_type='application/pgp-keys')
body_ = globals()[server_key_var]
content_type_ = 'application/pgp-keys'
status_ = 200
else:
responses.add(
responses.GET,
'https://example.com/.well-known/openpgpkey/hu/'
'5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
status=404)
body_ = None
content_type_ = None
status_ = 404
responses.add(
responses.GET,
'https://example.com/.well-known/openpgpkey/hu/'
'5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
body=body_,
content_type=content_type_,
status=status_)
responses.add(
responses.GET,
'https://openpgpkey.example.com/.well-known/openpgpkey/example.org/hu/'
'5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
body=body_,
content_type=content_type_,
status=status_)

if expected is None:
openpgp_env_with_refresh.refresh_keys(
Expand Down

0 comments on commit 6b8ff71

Please sign in to comment.