Skip to content

Commit

Permalink
Add check for certificates expiration (#887)
Browse files Browse the repository at this point in the history
* Add check for certificates validation

* Fix lint

* Adjust default parameter values
  • Loading branch information
leplatrem authored Sep 14, 2021
1 parent a630c4e commit 92d5144
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 0 deletions.
69 changes: 69 additions & 0 deletions checks/core/certificate_expiration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
SSL certificate should not expire for at least some minimum number of days.
Returns expiration date.
"""
import asyncio
import datetime
import logging
import ssl
from urllib.parse import urlparse

import cryptography
import cryptography.x509
from cryptography.hazmat.backends import default_backend as crypto_default_backend

from poucave.typings import CheckResult
from poucave.utils import utcnow


logger = logging.getLogger(__name__)


EXPOSED_PARAMETERS = ["url", "min_remaining_days"]


# Bound the alert thresholds.
LOWER_MIN_REMAINING_DAYS = 10
UPPER_MIN_REMAINING_DAYS = 30 # No need to warn more than 1 month in advance.


async def fetch_cert(url):
parsed = urlparse(url)
address = (parsed.netloc, parsed.port or 443)
loop = asyncio.get_event_loop()
cert_pem = await loop.run_in_executor(
None, lambda: ssl.get_server_certificate(address)
)
return cryptography.x509.load_pem_x509_certificate(
cert_pem.encode("utf8"), backend=crypto_default_backend()
)


async def run(
url: str,
percentage_remaining_validity: int = 5,
min_remaining_days: int = LOWER_MIN_REMAINING_DAYS,
max_remaining_days: int = UPPER_MIN_REMAINING_DAYS,
) -> CheckResult:
cert = await fetch_cert(url)
start = cert.not_valid_before.replace(tzinfo=datetime.timezone.utc)
end = cert.not_valid_after.replace(tzinfo=datetime.timezone.utc)
lifespan = (end - start).days

# The minimum remaining days depends on the certificate lifespan.
relative_minimum = lifespan * percentage_remaining_validity / 100
bounded_minimum = int(
min(UPPER_MIN_REMAINING_DAYS, max(min_remaining_days, relative_minimum))
)
remaining_days = (end - utcnow()).days

logger.debug(
f"Certificate lasts {lifespan} days and ends in {remaining_days} days "
f"({remaining_days - bounded_minimum} days before alert)."
)

success = remaining_days > bounded_minimum
return success, {
"expires": end.isoformat(),
}
67 changes: 67 additions & 0 deletions tests/checks/core/test_certificate_expiration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from datetime import datetime, timedelta
from unittest import mock

from checks.core.certificate_expiration import fetch_cert, run
from poucave.utils import utcnow


MODULE = "checks.core.certificate_expiration"

CERT = """
-----BEGIN CERTIFICATE-----
MIIDBTCCAougAwIBAgIIFcbkDrCrHAkwCgYIKoZIzj0EAwMwgaMxCzAJBgNVBAYT
AlVTMRwwGgYDVQQKExNNb3ppbGxhIENvcnBvcmF0aW9uMS8wLQYDVQQLEyZNb3pp
bGxhIEFNTyBQcm9kdWN0aW9uIFNpZ25pbmcgU2VydmljZTFFMEMGA1UEAww8Q29u
dGVudCBTaWduaW5nIEludGVybWVkaWF0ZS9lbWFpbEFkZHJlc3M9Zm94c2VjQG1v
emlsbGEuY29tMB4XDTE5MDgyMzIyNDQzMVoXDTE5MTExMTIyNDQzMVowgakxCzAJ
BgNVBAYTAlVTMRMwEQYDVQQIEwpDYWxpZm9ybmlhMRYwFAYDVQQHEw1Nb3VudGFp
biBWaWV3MRwwGgYDVQQKExNNb3ppbGxhIENvcnBvcmF0aW9uMRcwFQYDVQQLEw5D
bG91ZCBTZXJ2aWNlczE2MDQGA1UEAxMtcGlubmluZy1wcmVsb2FkLmNvbnRlbnQt
c2lnbmF0dXJlLm1vemlsbGEub3JnMHYwEAYHKoZIzj0CAQYFK4EEACIDYgAEX6Zd
vZ32rj9rDdRInp0kckbMtAdxOQxJ7EVAEZB2KOLUyotQL6A/9YWrMB4Msb4hfvxj
Nw05CS5/J4qUmsTkKLXQskjRe9x96uOXxprWiVwR4OLYagkJJR7YG1mTXmFzo4GD
MIGAMA4GA1UdDwEB/wQEAwIHgDATBgNVHSUEDDAKBggrBgEFBQcDAzAfBgNVHSME
GDAWgBSgHUoXT4zCKzVF8WPx2nBwp8744TA4BgNVHREEMTAvgi1waW5uaW5nLXBy
ZWxvYWQuY29udGVudC1zaWduYXR1cmUubW96aWxsYS5vcmcwCgYIKoZIzj0EAwMD
aAAwZQIxAOi2Eusi6MtEPOARiU+kZIi1vPnzTI71cA2ZIpzZ9aYg740eoJml8Guz
3oC6yXiIDAIwSy4Eylf+/nSMA73DUclcCjZc2yfRYIogII+krXBxoLkbPJcGaitx
qvRy6gQ1oC/z
-----END CERTIFICATE-----
"""


async def test_positive():
url = "https://fake.local"

next_month = utcnow() + timedelta(days=30)
fake_cert = mock.MagicMock(not_valid_before=utcnow(), not_valid_after=next_month)

with mock.patch(f"{MODULE}.fetch_cert", return_value=fake_cert) as mocked:
status, data = await run(url)
mocked.assert_called_with(url)

assert status is True
assert data == {"expires": next_month.isoformat()}


async def test_negative():
url = "https://fake.local"

next_month = utcnow() + timedelta(days=30)
fake_cert = mock.MagicMock(not_valid_before=utcnow(), not_valid_after=next_month)

with mock.patch(f"{MODULE}.fetch_cert", return_value=fake_cert):
status, data = await run(url, min_remaining_days=40)

assert status is False
assert data == {"expires": next_month.isoformat()}


async def test_fetch_cert():
with mock.patch(
f"{MODULE}.ssl.get_server_certificate", return_value=CERT
) as mocked:
cert = await fetch_cert("https://fake.local")
mocked.assert_called_with(("fake.local", 443))

assert cert.not_valid_after == datetime(2019, 11, 11, 22, 44, 31)

0 comments on commit 92d5144

Please sign in to comment.