From 0d18922eea1e2022cac46ff9dffb90097ac77aa9 Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Tue, 26 Oct 2021 16:38:11 +0200 Subject: [PATCH] Support URL also for SSL cert expiration check (#906) * Support arbitrary URL in cert expiration check * Remove unwanted changes --- checks/core/certificate_expiration.py | 29 ++++++++++--------- .../core/test_certificate_expiration.py | 8 +++-- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/checks/core/certificate_expiration.py b/checks/core/certificate_expiration.py index 50b1a439..86d312d2 100644 --- a/checks/core/certificate_expiration.py +++ b/checks/core/certificate_expiration.py @@ -29,23 +29,24 @@ async def fetch_cert(url): - parsed = urlparse(url) - address = (parsed.netloc, parsed.port or 443) - - # If the URL points to a server, then fetch the certificate - # using the SSL protocol. - if len(parsed.path) <= 1: # only trailing slash / + try: + # If the URL points to a certificate, then use it as it is. + cert_pem = await fetch_text(url) + parsed = cryptography.x509.load_pem_x509_certificate( + cert_pem.encode("utf8"), backend=crypto_default_backend() + ) + except ValueError: + # Otherwise, fetch the SSL certificate from the (host, port). + parsed_url = urlparse(url) + host, port = (parsed_url.netloc, parsed_url.port or 443) loop = asyncio.get_event_loop() cert_pem = await loop.run_in_executor( - None, lambda: ssl.get_server_certificate(address) + None, lambda: ssl.get_server_certificate((host, port)) ) - # Otherwise fetch the PEM file from the specified URL. - else: - cert_pem = await fetch_text(url) - - return cryptography.x509.load_pem_x509_certificate( - cert_pem.encode("utf8"), backend=crypto_default_backend() - ) + parsed = cryptography.x509.load_pem_x509_certificate( + cert_pem.encode("utf8"), backend=crypto_default_backend() + ) + return parsed async def run( diff --git a/tests/checks/core/test_certificate_expiration.py b/tests/checks/core/test_certificate_expiration.py index 378f4eb7..f4547df9 100644 --- a/tests/checks/core/test_certificate_expiration.py +++ b/tests/checks/core/test_certificate_expiration.py @@ -57,11 +57,15 @@ async def test_negative(): assert data == {"expires": next_month.isoformat()} -async def test_fetch_cert(): +async def test_fetch_cert(mock_aioresponses): + url = "https://fake.local" + # The check will try to fetch the URL content first. + mock_aioresponses.get(url, body="Something") + # Then will try using SSL. with mock.patch( f"{MODULE}.ssl.get_server_certificate", return_value=CERT ) as mocked: - cert = await fetch_cert("https://fake.local") + cert = await fetch_cert(url) mocked.assert_called_with(("fake.local", 443)) assert cert.not_valid_after == datetime(2019, 11, 11, 22, 44, 31)