forked from getsentry/integration-platform-example
-
Notifications
You must be signed in to change notification settings - Fork 0
/
alert_handler.py
129 lines (117 loc) · 4.39 KB
/
alert_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from typing import Any, Mapping, Sequence
from flask import Response
from src import app
from src.models import Item, SentryInstallation
from src.models.item import ItemColumn
from src.database import db_session
from ..alert_rule_action import (
AlertRuleSettings,
convert_sentry_fields_to_dict,
SentryField,
)
def get_alert_rule_settings(
sentry_installation: SentryInstallation,
data: Mapping[str, Any],
action: str = None,
) -> AlertRuleSettings:
"""
Returns the alert settings as a mapping of field["name"] to field["value"]
"""
fields: Sequence[SentryField] = []
# For issue alerts...
if data.get("event"):
fields = data.get("issue_alert", {}).get("settings", [])
# For metric alerts...
else:
triggers = (
data.get("metric_alert", {}).get("alert_rule", {}).get("triggers", [])
)
relevant_trigger = next(
(trigger for trigger in triggers if trigger.get("label") == action), {}
)
trigger_actions = relevant_trigger.get("actions", [])
integration_action = next(
(
t_a
for t_a in trigger_actions
if t_a.get("sentry_app_installation_uuid") == sentry_installation.uuid
),
{},
)
fields = integration_action.get("settings", [])
return convert_sentry_fields_to_dict(fields)
def handle_issue_alert(
sentry_installation: SentryInstallation,
data: Mapping[str, Any],
) -> Response:
settings = get_alert_rule_settings(sentry_installation, data)
item = Item(
organization_id=sentry_installation.organization_id,
title=f"🚨 Issue Alert: {settings.get('title') or data['event']['title']}",
description=settings.get("description")
or f"Latest Trigger: {data['event']['web_url']}",
column=ItemColumn.Todo,
sentry_id=data["event"]["issue_id"],
# data["issue_alert"] is only present for Alert Rule Action webhooks
sentry_alert_id=data.get("issue_alert", {}).get("id"),
assignee_id=settings.get("userId"),
)
db_session.add(item)
db_session.commit()
app.logger.info("Created item from Sentry issue alert trigger")
return Response("", 202)
def handle_metric_alert(
sentry_installation: SentryInstallation,
data: Mapping[str, Any],
action: str,
) -> Response:
if action == "resolved":
item_title_prefix = "✅ Resolved Metric"
elif action == "warning":
item_title_prefix = "⚠️ Warning Metric"
else:
item_title_prefix = "🔥 Critical Metric"
settings = get_alert_rule_settings(sentry_installation, data, action)
item_data = {
"title": f"{item_title_prefix}: {settings.get('title') or data['metric_alert']['title']}",
"description": settings.get("description") or data["description_text"],
"column": ItemColumn.Todo,
"assignee_id": settings.get("userId"),
"sentry_alert_id": data.get("metric_alert", {}).get("id"),
"organization_id": sentry_installation.organization_id,
}
item = Item.query.filter(
Item.sentry_alert_id == item_data["sentry_alert_id"],
Item.organization_id == item_data["organization_id"],
).first()
item_created = item is None
if item_created:
item = Item(**item_data)
db_session.add(item)
else:
for key, value in item_data.items():
setattr(item, key, value)
db_session.commit()
app.logger.info(
f"{'Created' if item_created else 'Modified'} item from metric alert {action} trigger"
)
return Response("", 202)
def alert_handler(
resource: str,
action: str,
sentry_installation: SentryInstallation,
data: Mapping[str, Any],
) -> Response:
# Issue Alerts (or Event Alerts) only have one type of action: 'triggered'
if resource == "event_alert":
return handle_issue_alert(sentry_installation, data)
# Metric Alerts have three types of actions: 'resolved', 'warning', and 'critical'
elif resource == "metric_alert":
if action in ["resolved", "warning", "critical"]:
return handle_metric_alert(sentry_installation, data, action)
else:
app.logger.info(f"Unexpected Sentry metric alert action: {action}")
return Response("", 400)
else:
app.logger.info(f"Unexpected Sentry resource: {resource}")
return Response("", 400)