|
1 | | -""" |
2 | | -Alert Notifier module - Sends notifications for detected anomalies |
3 | | -via multiple channels (Telegram, Webhooks). |
4 | | -""" |
5 | | - |
6 | 1 | import os |
7 | | -import json |
8 | | -import logging |
| 2 | +import time |
9 | 3 | import requests |
10 | | -from typing import List, Dict, Any, Optional |
11 | | -from datetime import datetime |
12 | | -from src.anomaly_detector import AnomalyResult |
13 | 4 |
|
14 | | -logger = logging.getLogger(__name__) |
15 | 5 |
|
16 | 6 | class AlertNotifier: |
17 | | - """ |
18 | | - Multichannel notification manager for system anomalies. |
19 | | - Supports Telegram Bot API and generic JSON webhooks. |
20 | | - """ |
21 | | - |
22 | | - def __init__(self, min_severity: float = 0.5): |
23 | | - """ |
24 | | - Initialize the notifier. |
25 | | - Args: |
26 | | - min_severity: Minimum severity score (0.0-1.0) to trigger alerts. |
27 | | - """ |
28 | | - self.min_severity = min_severity |
29 | | - # Telegram Config |
30 | | - self.tg_token = os.getenv("TELEGRAM_BOT_TOKEN") |
31 | | - self.tg_chat_id = os.getenv("TELEGRAM_CHANNEL_ID") |
32 | | - |
33 | | - # Webhook Config |
34 | | - self.webhook_url = os.getenv("ALERT_WEBHOOK_URL") |
35 | | - |
36 | | - # Status |
37 | | - self.has_telegram = bool(self.tg_token and self.tg_chat_id) |
38 | | - self.has_webhook = bool(self.webhook_url) |
39 | | - |
40 | | - if not (self.has_telegram or self.has_webhook): |
41 | | - logger.warning("AlertNotifier initialized with no active channels. Alerts will only be logged.") |
42 | | - |
43 | | - def _send_telegram(self, message: str): |
44 | | - """Send message via Telegram Bot API.""" |
45 | | - if not self.has_telegram: |
| 7 | + def __init__(self): |
| 8 | + self.telegram_bot_token = os.getenv('TELEGRAM_BOT_TOKEN') |
| 9 | + self.telegram_channel_id = os.getenv('TELEGRAM_CHANNEL_ID') |
| 10 | + self.webhook_urls = self._load_webhook_urls() |
| 11 | + self.max_retries = int(os.getenv('WEBHOOK_MAX_RETRIES', '3')) |
| 12 | + self.base_backoff_seconds = float(os.getenv('WEBHOOK_BACKOFF_SECONDS', '1')) |
| 13 | + |
| 14 | + def _load_webhook_urls(self): |
| 15 | + urls = [] |
| 16 | + |
| 17 | + single_url = os.getenv('ALERT_WEBHOOK_URL') |
| 18 | + if single_url: |
| 19 | + urls.append(single_url) |
| 20 | + |
| 21 | + registry = os.getenv('ALERT_WEBHOOK_URLS', '') |
| 22 | + if registry: |
| 23 | + urls.extend([url.strip() for url in registry.split(',') if url.strip()]) |
| 24 | + |
| 25 | + return list(dict.fromkeys(urls)) |
| 26 | + |
| 27 | + def notify_anomaly(self, result): |
| 28 | + if not getattr(result, 'is_anomaly', False): |
46 | 29 | return |
47 | | - |
48 | | - url = f"https://api.telegram.org/bot{self.tg_token}/sendMessage" |
| 30 | + |
49 | 31 | payload = { |
50 | | - "chat_id": self.tg_chat_id, |
51 | | - "text": message, |
52 | | - "parse_mode": "HTML" |
| 32 | + 'event': 'high_priority_insight', |
| 33 | + 'type': 'anomaly', |
| 34 | + 'metric_name': result.metric_name, |
| 35 | + 'severity_score': result.severity_score, |
| 36 | + 'current_value': result.current_value, |
| 37 | + 'baseline_mean': result.baseline_mean, |
| 38 | + 'baseline_std': result.baseline_std, |
| 39 | + 'z_score': result.z_score, |
| 40 | + 'timestamp': result.timestamp.isoformat() if result.timestamp else None, |
53 | 41 | } |
54 | | - |
55 | | - try: |
56 | | - response = requests.post(url, json=payload, timeout=10) |
57 | | - response.raise_for_status() |
58 | | - logger.info("Telegram alert sent successfully") |
59 | | - except Exception as e: |
60 | | - logger.error(f"Failed to send Telegram alert: {e}") |
61 | | - |
62 | | - def _send_webhook(self, data: Dict[str, Any]): |
63 | | - """Send JSON payload to generic webhook.""" |
64 | | - if not self.has_webhook: |
| 42 | + |
| 43 | + self._send_telegram(payload) |
| 44 | + self._send_webhooks(payload) |
| 45 | + |
| 46 | + def _send_telegram(self, payload): |
| 47 | + if not self.telegram_bot_token or not self.telegram_channel_id: |
65 | 48 | return |
66 | | - |
67 | | - try: |
68 | | - response = requests.post( |
69 | | - self.webhook_url, |
70 | | - json=data, |
71 | | - headers={"Content-Type": "application/json"}, |
72 | | - timeout=10 |
73 | | - ) |
74 | | - response.raise_for_status() |
75 | | - logger.info("Webhook alert sent successfully") |
76 | | - except Exception as e: |
77 | | - logger.error(f"Failed to send Webhook alert: {e}") |
78 | | - |
79 | | - def format_anomaly_message(self, result: AnomalyResult) -> str: |
80 | | - """Format AnomalyResult for Telegram (HTML).""" |
81 | | - severity_emoji = "🔴" if result.severity_score > 0.8 else "🟠" |
82 | | - |
83 | | - return ( |
84 | | - f"🚨 <b>{result.metric_name.upper()} ANOMALY DETECTED</b> {severity_emoji}\n\n" |
85 | | - f"<b>Severity:</b> {result.severity_score:.2f}\n" |
86 | | - f"<b>Value:</b> {result.current_value:,.4f}\n" |
87 | | - f"<b>Z-Score:</b> {result.z_score:.2f}\n" |
88 | | - f"<b>Baseline Mean:</b> {result.baseline_mean:,.4f}\n" |
89 | | - f"<b>Timestamp:</b> {result.timestamp.strftime('%Y-%m-%d %H:%M:%S')} UTC\n\n" |
90 | | - f"<i>#LumenPulse #Anomalies #Alert</i>" |
| 49 | + |
| 50 | + text = ( |
| 51 | + '🚨 High-Priority Insight\n' |
| 52 | + f"Metric: {payload['metric_name']}\n" |
| 53 | + f"Severity: {payload['severity_score']}\n" |
| 54 | + f"Current: {payload['current_value']}\n" |
| 55 | + f"Z-Score: {payload['z_score']}" |
91 | 56 | ) |
92 | 57 |
|
93 | | - def notify_anomaly(self, result: AnomalyResult): |
94 | | - """Send notifications for a single anomaly if it meets severity threshold.""" |
95 | | - if not result.is_anomaly or result.severity_score < self.min_severity: |
96 | | - return |
| 58 | + requests.post( |
| 59 | + f"https://api.telegram.org/bot{self.telegram_bot_token}/sendMessage", |
| 60 | + json={ |
| 61 | + 'chat_id': self.telegram_channel_id, |
| 62 | + 'text': text, |
| 63 | + }, |
| 64 | + timeout=10, |
| 65 | + ) |
| 66 | + |
| 67 | + def _send_webhooks(self, payload): |
| 68 | + for url in self.webhook_urls: |
| 69 | + self._post_with_retry(url, payload) |
| 70 | + |
| 71 | + def _post_with_retry(self, url, payload): |
| 72 | + for attempt in range(self.max_retries): |
| 73 | + try: |
| 74 | + response = requests.post(url, json=payload, timeout=10) |
| 75 | + if response.status_code < 400: |
| 76 | + return True |
| 77 | + except requests.RequestException: |
| 78 | + pass |
| 79 | + |
| 80 | + if attempt < self.max_retries - 1: |
| 81 | + time.sleep(self.base_backoff_seconds * (2 ** attempt)) |
97 | 82 |
|
98 | | - logger.info(f"Notifying anomaly: {result.metric_name} (severity={result.severity_score:.2f})") |
99 | | - |
100 | | - # 1. Telegram |
101 | | - if self.has_telegram: |
102 | | - msg = self.format_anomaly_message(result) |
103 | | - self._send_telegram(msg) |
104 | | - |
105 | | - # 2. Webhook |
106 | | - if self.has_webhook: |
107 | | - self._send_webhook({ |
108 | | - "event": "anomaly_detected", |
109 | | - "severity": "high" if result.severity_score > 0.8 else "medium", |
110 | | - "data": result.to_dict() |
111 | | - }) |
112 | | - |
113 | | - def notify_batch(self, results: List[AnomalyResult]): |
114 | | - """Filter and notify for a list of results.""" |
115 | | - for result in results: |
116 | | - if result.is_anomaly: |
117 | | - self.notify_anomaly(result) |
118 | | - |
119 | | -# Singleton instance for easy import |
120 | | -notifier = AlertNotifier( |
121 | | - min_severity=float(os.getenv("ALERT_MIN_SEVERITY", "0.5")) |
122 | | -) |
| 83 | + return False |
0 commit comments