Skip to content

Commit

Permalink
Support URL also for SSL cert expiration check (#906)
Browse files Browse the repository at this point in the history
* Support arbitrary URL in cert expiration check

* Remove unwanted changes
  • Loading branch information
leplatrem authored Oct 26, 2021
1 parent 1054c1c commit 0d18922
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 16 deletions.
29 changes: 15 additions & 14 deletions checks/core/certificate_expiration.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,24 @@


async def fetch_cert(url):
parsed = urlparse(url)
address = (parsed.netloc, parsed.port or 443)

# If the URL points to a server, then fetch the certificate
# using the SSL protocol.
if len(parsed.path) <= 1: # only trailing slash /
try:
# If the URL points to a certificate, then use it as it is.
cert_pem = await fetch_text(url)
parsed = cryptography.x509.load_pem_x509_certificate(
cert_pem.encode("utf8"), backend=crypto_default_backend()
)
except ValueError:
# Otherwise, fetch the SSL certificate from the (host, port).
parsed_url = urlparse(url)
host, port = (parsed_url.netloc, parsed_url.port or 443)
loop = asyncio.get_event_loop()
cert_pem = await loop.run_in_executor(
None, lambda: ssl.get_server_certificate(address)
None, lambda: ssl.get_server_certificate((host, port))
)
# Otherwise fetch the PEM file from the specified URL.
else:
cert_pem = await fetch_text(url)

return cryptography.x509.load_pem_x509_certificate(
cert_pem.encode("utf8"), backend=crypto_default_backend()
)
parsed = cryptography.x509.load_pem_x509_certificate(
cert_pem.encode("utf8"), backend=crypto_default_backend()
)
return parsed


async def run(
Expand Down
8 changes: 6 additions & 2 deletions tests/checks/core/test_certificate_expiration.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,15 @@ async def test_negative():
assert data == {"expires": next_month.isoformat()}


async def test_fetch_cert():
async def test_fetch_cert(mock_aioresponses):
url = "https://fake.local"
# The check will try to fetch the URL content first.
mock_aioresponses.get(url, body="<html>Something</html>")
# Then will try using SSL.
with mock.patch(
f"{MODULE}.ssl.get_server_certificate", return_value=CERT
) as mocked:
cert = await fetch_cert("https://fake.local")
cert = await fetch_cert(url)
mocked.assert_called_with(("fake.local", 443))

assert cert.not_valid_after == datetime(2019, 11, 11, 22, 44, 31)
Expand Down

0 comments on commit 0d18922

Please sign in to comment.