diff --git a/checks/core/certificate_expiration.py b/checks/core/certificate_expiration.py new file mode 100644 index 00000000..51b6fdb8 --- /dev/null +++ b/checks/core/certificate_expiration.py @@ -0,0 +1,69 @@ +""" +SSL certificate should not expire for at least some minimum number of days. + +Returns expiration date. +""" +import asyncio +import datetime +import logging +import ssl +from urllib.parse import urlparse + +import cryptography +import cryptography.x509 +from cryptography.hazmat.backends import default_backend as crypto_default_backend + +from poucave.typings import CheckResult +from poucave.utils import utcnow + + +logger = logging.getLogger(__name__) + + +EXPOSED_PARAMETERS = ["url", "min_remaining_days"] + + +# Bound the alert thresholds. +LOWER_MIN_REMAINING_DAYS = 10 +UPPER_MIN_REMAINING_DAYS = 30 # No need to warn more than 1 month in advance. + + +async def fetch_cert(url): + parsed = urlparse(url) + address = (parsed.netloc, parsed.port or 443) + loop = asyncio.get_event_loop() + cert_pem = await loop.run_in_executor( + None, lambda: ssl.get_server_certificate(address) + ) + return cryptography.x509.load_pem_x509_certificate( + cert_pem.encode("utf8"), backend=crypto_default_backend() + ) + + +async def run( + url: str, + percentage_remaining_validity: int = 5, + min_remaining_days: int = LOWER_MIN_REMAINING_DAYS, + max_remaining_days: int = UPPER_MIN_REMAINING_DAYS, +) -> CheckResult: + cert = await fetch_cert(url) + start = cert.not_valid_before.replace(tzinfo=datetime.timezone.utc) + end = cert.not_valid_after.replace(tzinfo=datetime.timezone.utc) + lifespan = (end - start).days + + # The minimum remaining days depends on the certificate lifespan. + relative_minimum = lifespan * percentage_remaining_validity / 100 + bounded_minimum = int( + min(UPPER_MIN_REMAINING_DAYS, max(min_remaining_days, relative_minimum)) + ) + remaining_days = (end - utcnow()).days + + logger.debug( + f"Certificate lasts {lifespan} days and ends in {remaining_days} days " + f"({remaining_days - bounded_minimum} days before alert)." + ) + + success = remaining_days > bounded_minimum + return success, { + "expires": end.isoformat(), + } diff --git a/tests/checks/core/test_certificate_expiration.py b/tests/checks/core/test_certificate_expiration.py new file mode 100644 index 00000000..420752d6 --- /dev/null +++ b/tests/checks/core/test_certificate_expiration.py @@ -0,0 +1,67 @@ +from datetime import datetime, timedelta +from unittest import mock + +from checks.core.certificate_expiration import fetch_cert, run +from poucave.utils import utcnow + + +MODULE = "checks.core.certificate_expiration" + +CERT = """ +-----BEGIN CERTIFICATE----- +MIIDBTCCAougAwIBAgIIFcbkDrCrHAkwCgYIKoZIzj0EAwMwgaMxCzAJBgNVBAYT +AlVTMRwwGgYDVQQKExNNb3ppbGxhIENvcnBvcmF0aW9uMS8wLQYDVQQLEyZNb3pp +bGxhIEFNTyBQcm9kdWN0aW9uIFNpZ25pbmcgU2VydmljZTFFMEMGA1UEAww8Q29u +dGVudCBTaWduaW5nIEludGVybWVkaWF0ZS9lbWFpbEFkZHJlc3M9Zm94c2VjQG1v +emlsbGEuY29tMB4XDTE5MDgyMzIyNDQzMVoXDTE5MTExMTIyNDQzMVowgakxCzAJ +BgNVBAYTAlVTMRMwEQYDVQQIEwpDYWxpZm9ybmlhMRYwFAYDVQQHEw1Nb3VudGFp +biBWaWV3MRwwGgYDVQQKExNNb3ppbGxhIENvcnBvcmF0aW9uMRcwFQYDVQQLEw5D +bG91ZCBTZXJ2aWNlczE2MDQGA1UEAxMtcGlubmluZy1wcmVsb2FkLmNvbnRlbnQt +c2lnbmF0dXJlLm1vemlsbGEub3JnMHYwEAYHKoZIzj0CAQYFK4EEACIDYgAEX6Zd +vZ32rj9rDdRInp0kckbMtAdxOQxJ7EVAEZB2KOLUyotQL6A/9YWrMB4Msb4hfvxj +Nw05CS5/J4qUmsTkKLXQskjRe9x96uOXxprWiVwR4OLYagkJJR7YG1mTXmFzo4GD +MIGAMA4GA1UdDwEB/wQEAwIHgDATBgNVHSUEDDAKBggrBgEFBQcDAzAfBgNVHSME +GDAWgBSgHUoXT4zCKzVF8WPx2nBwp8744TA4BgNVHREEMTAvgi1waW5uaW5nLXBy +ZWxvYWQuY29udGVudC1zaWduYXR1cmUubW96aWxsYS5vcmcwCgYIKoZIzj0EAwMD +aAAwZQIxAOi2Eusi6MtEPOARiU+kZIi1vPnzTI71cA2ZIpzZ9aYg740eoJml8Guz +3oC6yXiIDAIwSy4Eylf+/nSMA73DUclcCjZc2yfRYIogII+krXBxoLkbPJcGaitx +qvRy6gQ1oC/z +-----END CERTIFICATE----- +""" + + +async def test_positive(): + url = "https://fake.local" + + next_month = utcnow() + timedelta(days=30) + fake_cert = mock.MagicMock(not_valid_before=utcnow(), not_valid_after=next_month) + + with mock.patch(f"{MODULE}.fetch_cert", return_value=fake_cert) as mocked: + status, data = await run(url) + mocked.assert_called_with(url) + + assert status is True + assert data == {"expires": next_month.isoformat()} + + +async def test_negative(): + url = "https://fake.local" + + next_month = utcnow() + timedelta(days=30) + fake_cert = mock.MagicMock(not_valid_before=utcnow(), not_valid_after=next_month) + + with mock.patch(f"{MODULE}.fetch_cert", return_value=fake_cert): + status, data = await run(url, min_remaining_days=40) + + assert status is False + assert data == {"expires": next_month.isoformat()} + + +async def test_fetch_cert(): + with mock.patch( + f"{MODULE}.ssl.get_server_certificate", return_value=CERT + ) as mocked: + cert = await fetch_cert("https://fake.local") + mocked.assert_called_with(("fake.local", 443)) + + assert cert.not_valid_after == datetime(2019, 11, 11, 22, 44, 31)