Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lemur/plugins/lemur_telegram/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
try:
from importlib.metadata import version
VERSION = version(__name__)
except Exception as e:
VERSION = "unknown"
109 changes: 109 additions & 0 deletions lemur/plugins/lemur_telegram/plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a changelog / amdinistration.rst entry for this as well?

.. module: lemur.plugins.lemur_telegram.plugin
:platform: Unix
:copyright: (c) 2025 by Fedor S
:license: Apache, see LICENSE for more details.

.. moduleauthor:: Fedor S <[email protected]>
"""
import arrow
import requests
from flask import current_app

from lemur.common.utils import check_validation
from lemur.plugins import lemur_telegram as telegram
from lemur.plugins.bases import ExpirationNotificationPlugin


def escape(text):
special = r'_*[]()~`>#+-=|{}.!'
return ''.join('\\' + c if c in special else c for c in text)


def create_certificate_url(name):
return "https://{hostname}/#/certificates/{name}".format(
hostname=current_app.config.get("LEMUR_HOSTNAME"), name=name
)


def create_expiration_attachments(certificates):
attachments = []
for certificate in certificates:
name = escape(certificate["name"])
owner = escape(certificate["owner"])
url = create_certificate_url(certificate["name"])
expires = arrow.get(certificate["validityEnd"]).format("dddd, MMMM D, YYYY")
endpoints = len(certificate["endpoints"])
attachments.append(
f"*Certificate:* [{name}]({url})\n*Owner:* {owner}\n*Expires:* {expires}\n*Endpoints:* {endpoints}\n\n"
)
return attachments


def create_rotation_attachments(certificate):
name = escape(certificate["name"])
owner = escape(certificate["owner"])
url = create_certificate_url(certificate["name"])
expires = arrow.get(certificate["validityEnd"]).format("dddd, MMMM D, YYYY")
endpoints = len(certificate["endpoints"])
return f"*Certificate:* [{name}]({url})\n*Owner:* {owner}\n*Expires:* {expires}\n*Endpoints rotated:* {endpoints}\n\n"


class TelegramNotificationPlugin(ExpirationNotificationPlugin):
title = "Telegram"
slug = "tg-notification"
description = "Sends certificate expiration notifications to Telegram"
version = telegram.VERSION

author = "Fedor S"
author_url = "https://github.com/netflix/lemur"

additional_options = [
{
"name": "chat",
"type": "str",
"required": True,
"validation": check_validation("^[+-]?\d+(\.\d+)?$"),
"helpMessage": "The chat id to send notification to",
},
{
"name": "token",
"type": "str",
"required": True,
"validation": check_validation("^\d+:[A-Za-z0-9_]+$"),
"helpMessage": "Bot API Token",
},
]

def send(self, notification_type, message, targets, options, **kwargs):
"""
A typical check can be performed using the notify command:
`lemur notify`

While we receive a `targets` parameter here, it is unused, plugin currently supports sending to only one chat.
"""
attachments = None
if notification_type == "expiration":
attachments = create_expiration_attachments(message)

elif notification_type == "rotation":
attachments = create_rotation_attachments(message)

if not attachments:
raise Exception("Unable to create message attachments")

data = {
"parse_mode": "MarkdownV2",
"chat_id": self.get_option("chat", options),
"text": "*Lemur {} Notification*\n\n{}".format(notification_type.capitalize(), *attachments),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the structure, it should probably be:

For expiration (list of strings)

"text": "Lemur {} Notification\n\n{}".format(
notification_type.capitalize(),
''.join(attachments) # Join the list into one string
)

Or for rotation (single string)

"text": "Lemur {} Notification\n\n{}".format(
notification_type.capitalize(),
attachments # Already a string, no unpacking
)
as currently unpacking will just yield the first element

}

r = requests.post("https://api.telegram.org/bot{}/sendMessage".format(self.get_option("token", options)),
data=data)

if r.status_code not in [200]:
raise Exception(f"Failed to send message. Telegram response: {r.status_code} {data}")

current_app.logger.info(
f"Telegram response: {r.status_code} Message Body: {data}"
)
1 change: 1 addition & 0 deletions lemur/plugins/lemur_telegram/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from lemur.tests.conftest import * # noqa
142 changes: 142 additions & 0 deletions lemur/plugins/lemur_telegram/tests/test_telegram.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
from datetime import timedelta

import arrow
import pytest
from moto import mock_ses

from lemur.certificates.schemas import certificate_notification_output_schema
from lemur.tests.factories import NotificationFactory, CertificateFactory
from lemur.tests.test_messaging import verify_sender_email


def test_formatting(certificate):
from lemur.plugins.lemur_telegram.plugin import create_expiration_attachments
data = [certificate_notification_output_schema.dump(certificate).data]
attachments = create_expiration_attachments(data)
body = attachments[0]
assert certificate.name in body
assert "Owner:" in body
assert "Expires:" in body
assert "Endpoints:" in body
assert "https://" in body
assert certificate.name in body.split("(")[1]


def get_options():
return [
{"name": "interval", "value": 10},
{"name": "unit", "value": "days"},
{"name": "chat", "value": "12345"},
{"name": "token", "value": "999:TESTTOKEN"},
]


def prepare_test():
verify_sender_email()
notification = NotificationFactory(plugin_name="tg-notification")
notification.options = get_options()
now = arrow.utcnow()
in_ten_days = now + timedelta(days=10, hours=1)
certificate = CertificateFactory()
certificate.not_after = in_ten_days
certificate.notifications.append(notification)


@mock_ses()
def test_send_expiration_notification(mocker):
from lemur.notifications.messaging import send_expiration_notifications
# Telegram API request mock
mock_post = mocker.patch(
"lemur.plugins.lemur_telegram.plugin.requests.post"
)
mock_post.return_value.status_code = 200

prepare_test()

sent, failed = send_expiration_notifications([], [])

# Why 3:
# - owner email
# - security email
# - telegram notification
assert (sent, failed) == (3, 0)

# Ensure Telegram was hit
assert mock_post.called
data = mock_post.call_args[1]["data"]
assert "Lemur Expiration Notification" in data["text"]
assert data["chat_id"] == "12345"


@mock_ses()
def test_send_expiration_notification_telegram_disabled(mocker):
from lemur.notifications.messaging import send_expiration_notifications

mocker.patch(
"lemur.plugins.lemur_telegram.plugin.requests.post"
)

prepare_test()

# Disabling telegram means: owner+security emails SHOULD NOT be skipped,
# but messaging rules say: if the *main* plugin (tg-notification) disabled → skip all
assert send_expiration_notifications([], ["tg-notification"]) == (0, 0)


@mock_ses()
def test_send_expiration_notification_email_disabled(mocker):
from lemur.notifications.messaging import send_expiration_notifications

mocker.patch(
"lemur.plugins.lemur_telegram.plugin.requests.post"
)

prepare_test()

# Email disabled → Telegram still fires
# sent, failed =
assert send_expiration_notifications([], ["email-notification"]) == (0, 1)


@mock_ses()
def test_send_expiration_notification_both_disabled(mocker):
from lemur.notifications.messaging import send_expiration_notifications

mocker.patch(
"lemur.plugins.lemur_telegram.plugin.requests.post"
)

prepare_test()

assert send_expiration_notifications([], ["tg-notification", "email-notification"]) == (0, 0)


def test_send_failure_on_bad_status(mocker, certificate):
from lemur.plugins.lemur_telegram.plugin import TelegramNotificationPlugin

plugin = TelegramNotificationPlugin()

mock_post = mocker.patch(
"lemur.plugins.lemur_telegram.plugin.requests.post"
)
mock_post.return_value.status_code = 403

options = {"chat": "12345", "token": "999:BAD"}

cert_data = [certificate_notification_output_schema.dump(certificate).data]

with pytest.raises(Exception):
plugin.send("expiration", cert_data, None, options)


def test_unsupported_notification_type_raises(mocker):
from lemur.plugins.lemur_telegram.plugin import TelegramNotificationPlugin

plugin = TelegramNotificationPlugin()

mocker.patch("lemur.plugins.lemur_telegram.plugin.requests.post")

with pytest.raises(Exception) as exc:
plugin.send("unknown", {}, None, {"chat": "1", "token": "999:X"})

assert "Unable to create message attachments" in str(exc.value)
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ entrust_issuer = "lemur.plugins.lemur_entrust.plugin:EntrustIssuerPlugin"
entrust_source = "lemur.plugins.lemur_entrust.plugin:EntrustSourcePlugin"
azure_destination = "lemur.plugins.lemur_azure_dest.plugin:AzureDestinationPlugin"
google_ca_issuer = "lemur.plugins.lemur_google_ca.plugin:GoogleCaIssuerPlugin"
telegram_notification = "lemur.plugins.lemur_telegram.plugin:TelegramNotificationPlugin"

[tool.setuptools]
include-package-data = true
Expand Down
Loading