diff --git a/lemur/plugins/lemur_telegram/__init__.py b/lemur/plugins/lemur_telegram/__init__.py new file mode 100644 index 000000000..e537d4726 --- /dev/null +++ b/lemur/plugins/lemur_telegram/__init__.py @@ -0,0 +1,5 @@ +try: + from importlib.metadata import version + VERSION = version(__name__) +except Exception as e: + VERSION = "unknown" diff --git a/lemur/plugins/lemur_telegram/plugin.py b/lemur/plugins/lemur_telegram/plugin.py new file mode 100644 index 000000000..6eaf42dbf --- /dev/null +++ b/lemur/plugins/lemur_telegram/plugin.py @@ -0,0 +1,109 @@ +""" +.. module: lemur.plugins.lemur_telegram.plugin + :platform: Unix + :copyright: (c) 2025 by Fedor S + :license: Apache, see LICENSE for more details. + +.. moduleauthor:: Fedor S +""" +import arrow +import requests +from flask import current_app + +from lemur.common.utils import check_validation +from lemur.plugins import lemur_telegram as telegram +from lemur.plugins.bases import ExpirationNotificationPlugin + + +def escape(text): + special = r'_*[]()~`>#+-=|{}.!' + return ''.join('\\' + c if c in special else c for c in text) + + +def create_certificate_url(name): + return "https://{hostname}/#/certificates/{name}".format( + hostname=current_app.config.get("LEMUR_HOSTNAME"), name=name + ) + + +def create_expiration_attachments(certificates): + attachments = [] + for certificate in certificates: + name = escape(certificate["name"]) + owner = escape(certificate["owner"]) + url = create_certificate_url(certificate["name"]) + expires = arrow.get(certificate["validityEnd"]).format("dddd, MMMM D, YYYY") + endpoints = len(certificate["endpoints"]) + attachments.append( + f"*Certificate:* [{name}]({url})\n*Owner:* {owner}\n*Expires:* {expires}\n*Endpoints:* {endpoints}\n\n" + ) + return attachments + + +def create_rotation_attachments(certificate): + name = escape(certificate["name"]) + owner = escape(certificate["owner"]) + url = create_certificate_url(certificate["name"]) + expires = arrow.get(certificate["validityEnd"]).format("dddd, MMMM D, YYYY") + endpoints = len(certificate["endpoints"]) + return f"*Certificate:* [{name}]({url})\n*Owner:* {owner}\n*Expires:* {expires}\n*Endpoints rotated:* {endpoints}\n\n" + + +class TelegramNotificationPlugin(ExpirationNotificationPlugin): + title = "Telegram" + slug = "tg-notification" + description = "Sends certificate expiration notifications to Telegram" + version = telegram.VERSION + + author = "Fedor S" + author_url = "https://github.com/netflix/lemur" + + additional_options = [ + { + "name": "chat", + "type": "str", + "required": True, + "validation": check_validation("^[+-]?\d+(\.\d+)?$"), + "helpMessage": "The chat id to send notification to", + }, + { + "name": "token", + "type": "str", + "required": True, + "validation": check_validation("^\d+:[A-Za-z0-9_]+$"), + "helpMessage": "Bot API Token", + }, + ] + + def send(self, notification_type, message, targets, options, **kwargs): + """ + A typical check can be performed using the notify command: + `lemur notify` + + While we receive a `targets` parameter here, it is unused, plugin currently supports sending to only one chat. + """ + attachments = None + if notification_type == "expiration": + attachments = create_expiration_attachments(message) + + elif notification_type == "rotation": + attachments = create_rotation_attachments(message) + + if not attachments: + raise Exception("Unable to create message attachments") + + data = { + "parse_mode": "MarkdownV2", + "chat_id": self.get_option("chat", options), + "text": "*Lemur {} Notification*\n\n{}".format(notification_type.capitalize(), *attachments), + } + + r = requests.post("https://api.telegram.org/bot{}/sendMessage".format(self.get_option("token", options)), + data=data) + + if r.status_code not in [200]: + raise Exception(f"Failed to send message. Telegram response: {r.status_code} {data}") + + current_app.logger.info( + f"Telegram response: {r.status_code} Message Body: {data}" + ) diff --git a/lemur/plugins/lemur_telegram/tests/conftest.py b/lemur/plugins/lemur_telegram/tests/conftest.py new file mode 100644 index 000000000..0e1cd89f3 --- /dev/null +++ b/lemur/plugins/lemur_telegram/tests/conftest.py @@ -0,0 +1 @@ +from lemur.tests.conftest import * # noqa diff --git a/lemur/plugins/lemur_telegram/tests/test_telegram.py b/lemur/plugins/lemur_telegram/tests/test_telegram.py new file mode 100644 index 000000000..cfb7ae061 --- /dev/null +++ b/lemur/plugins/lemur_telegram/tests/test_telegram.py @@ -0,0 +1,142 @@ +from datetime import timedelta + +import arrow +import pytest +from moto import mock_ses + +from lemur.certificates.schemas import certificate_notification_output_schema +from lemur.tests.factories import NotificationFactory, CertificateFactory +from lemur.tests.test_messaging import verify_sender_email + + +def test_formatting(certificate): + from lemur.plugins.lemur_telegram.plugin import create_expiration_attachments + data = [certificate_notification_output_schema.dump(certificate).data] + attachments = create_expiration_attachments(data) + body = attachments[0] + assert certificate.name in body + assert "Owner:" in body + assert "Expires:" in body + assert "Endpoints:" in body + assert "https://" in body + assert certificate.name in body.split("(")[1] + + +def get_options(): + return [ + {"name": "interval", "value": 10}, + {"name": "unit", "value": "days"}, + {"name": "chat", "value": "12345"}, + {"name": "token", "value": "999:TESTTOKEN"}, + ] + + +def prepare_test(): + verify_sender_email() + notification = NotificationFactory(plugin_name="tg-notification") + notification.options = get_options() + now = arrow.utcnow() + in_ten_days = now + timedelta(days=10, hours=1) + certificate = CertificateFactory() + certificate.not_after = in_ten_days + certificate.notifications.append(notification) + + +@mock_ses() +def test_send_expiration_notification(mocker): + from lemur.notifications.messaging import send_expiration_notifications + # Telegram API request mock + mock_post = mocker.patch( + "lemur.plugins.lemur_telegram.plugin.requests.post" + ) + mock_post.return_value.status_code = 200 + + prepare_test() + + sent, failed = send_expiration_notifications([], []) + + # Why 3: + # - owner email + # - security email + # - telegram notification + assert (sent, failed) == (3, 0) + + # Ensure Telegram was hit + assert mock_post.called + data = mock_post.call_args[1]["data"] + assert "Lemur Expiration Notification" in data["text"] + assert data["chat_id"] == "12345" + + +@mock_ses() +def test_send_expiration_notification_telegram_disabled(mocker): + from lemur.notifications.messaging import send_expiration_notifications + + mocker.patch( + "lemur.plugins.lemur_telegram.plugin.requests.post" + ) + + prepare_test() + + # Disabling telegram means: owner+security emails SHOULD NOT be skipped, + # but messaging rules say: if the *main* plugin (tg-notification) disabled → skip all + assert send_expiration_notifications([], ["tg-notification"]) == (0, 0) + + +@mock_ses() +def test_send_expiration_notification_email_disabled(mocker): + from lemur.notifications.messaging import send_expiration_notifications + + mocker.patch( + "lemur.plugins.lemur_telegram.plugin.requests.post" + ) + + prepare_test() + + # Email disabled → Telegram still fires + # sent, failed = + assert send_expiration_notifications([], ["email-notification"]) == (0, 1) + + +@mock_ses() +def test_send_expiration_notification_both_disabled(mocker): + from lemur.notifications.messaging import send_expiration_notifications + + mocker.patch( + "lemur.plugins.lemur_telegram.plugin.requests.post" + ) + + prepare_test() + + assert send_expiration_notifications([], ["tg-notification", "email-notification"]) == (0, 0) + + +def test_send_failure_on_bad_status(mocker, certificate): + from lemur.plugins.lemur_telegram.plugin import TelegramNotificationPlugin + + plugin = TelegramNotificationPlugin() + + mock_post = mocker.patch( + "lemur.plugins.lemur_telegram.plugin.requests.post" + ) + mock_post.return_value.status_code = 403 + + options = {"chat": "12345", "token": "999:BAD"} + + cert_data = [certificate_notification_output_schema.dump(certificate).data] + + with pytest.raises(Exception): + plugin.send("expiration", cert_data, None, options) + + +def test_unsupported_notification_type_raises(mocker): + from lemur.plugins.lemur_telegram.plugin import TelegramNotificationPlugin + + plugin = TelegramNotificationPlugin() + + mocker.patch("lemur.plugins.lemur_telegram.plugin.requests.post") + + with pytest.raises(Exception) as exc: + plugin.send("unknown", {}, None, {"chat": "1", "token": "999:X"}) + + assert "Unable to create message attachments" in str(exc.value) diff --git a/pyproject.toml b/pyproject.toml index c3846156e..248021d29 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -67,6 +67,7 @@ entrust_issuer = "lemur.plugins.lemur_entrust.plugin:EntrustIssuerPlugin" entrust_source = "lemur.plugins.lemur_entrust.plugin:EntrustSourcePlugin" azure_destination = "lemur.plugins.lemur_azure_dest.plugin:AzureDestinationPlugin" google_ca_issuer = "lemur.plugins.lemur_google_ca.plugin:GoogleCaIssuerPlugin" +telegram_notification = "lemur.plugins.lemur_telegram.plugin:TelegramNotificationPlugin" [tool.setuptools] include-package-data = true