Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Airflow provider integration and documentation #4090

Merged
merged 10 commits into from
Mar 19, 2025
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,19 @@
</tr>
</table>

### Workflow Orchestration

<table>
<tr>
</td align="center" width="150">
<a href="https://docs.keephq.dev/providers/documentation/airflow-provider" target="_blank">
<img width="40" src="keep-ui/public/icons/airflow-icon.png" alt="Airflow"/><br/>
Airflow
</a>
</td>
</tr>
</table>

## Workflows

Keep is GitHub Actions for your monitoring tools.
Expand Down
Binary file added docs/images/airflow_1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/airflow_2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/mint.json
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
{
"group": "Supported Providers",
"pages": [
"providers/documentation/airflow-provider",
"providers/documentation/aks-provider",
"providers/documentation/amazonsqs-provider",
"providers/documentation/anthropic-provider",
Expand Down
152 changes: 152 additions & 0 deletions docs/providers/documentation/airflow-provider.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
---
title: "Airflow"
sidebarTitle: "Airflow Provider"
description: "The Airflow provider integration allows you to send alerts (e.g. DAG failures) from Airflow to Keep via webhooks."
---

## Overview

[Apache Airflow](https://airflow.apache.org/docs/apache-airflow/stable/index.html) is an open-source tool for programmatically authoring, scheduling, and monitoring data pipelines. Airflow's extensible Python framework enables you to build workflows that connect with virtually any technology. When working with Airflow, it's essential to monitor the health of your DAGs and tasks to ensure that your data pipelines run smoothly. The Airflow Provider integration allows seamless communication between Airflow and Keep, so you can forward alerts, such as task failures, directly to Keep via webhook configurations.

![Apache Airflow](/images/airflow_1.png)

## Connecting Airflow to Keep

### Alert Integration via Webhook

To connect Airflow to Keep, configure Airflow to send alerts using Keep's webhook. You must provide:

- **Keep Webhook URL**: The webhook URL provided by Keep (for example, `https://api.keephq.dev/alerts/event/airflow`).
- **Keep API Key**: The API key generated on Keep's platform, which is used for authentication.

A common method to integrate Airflow with Keep is by configuring alerts through [Airflow Callbacks](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/callbacks.html). For instance, when an Airflow task fails, a callback can send an alert to Keep via the webhook.

There are several steps to implement this:

### Step 1: Define Keep's Alert Information

Structure your alert payload with the following information:

```python
data = {
"name": "Airflow Task Failure",
"description": "Task keep_task failed in DAG keep_dag",
"status": "firing",
"service": "pipeline",
"severity": "critical",
}
```

### Step 2: Configure Keep's Webhook Credentials

To send alerts to Keep, configure the webhook URL and API key. Below is an example of how to send an alert using Python:

> **Note**: You need to set up the `KEEP_API_KEY` environment variable with your Keep API key.

```python
import os
import requests

def send_alert_to_keep(dag_id, task_id, execution_date, error_message):
# Replace with your specific Keep webhook URL if different.
keep_webhook_url = "https://api.keephq.dev/alerts/event/airflow"
api_key = os.getenv("KEEP_API_KEY")
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"X-API-KEY": api_key,
}

data = {
"name": f"Airflow Task Failure: {task_id}",
"message": f"Task {task_id} failed in DAG {dag_id} at {execution_date}",
"status": "firing",
"service": "pipeline",
"severity": "critical",
"description": str(error_message),
}

response = requests.post(keep_webhook_url, headers=headers, json=data)
response.raise_for_status()
```

### Step 3: Configure the Airflow Callback Function

Now, configure the callback so that an alert is sent to Keep when a task fails. You can attach this callback to one or more tasks in your DAG as shown below:

```python
import os
import requests
from datetime import datetime
from datetime import timedelta

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

def send_alert_to_keep(dag_id, task_id, execution_date, error_message):
# Replace with your specific Keep webhook URL if different.
keep_webhook_url = "https://api.keephq.dev/alerts/event/airflow"
api_key = os.getenv("KEEP_API_KEY")
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"X-API-KEY": api_key,
}

data = {
"name": f"Airflow Task Failure: {task_id}",
"message": f"Task {task_id} failed in DAG {dag_id} at {execution_date}",
"status": "firing",
"service": "pipeline",
"severity": "critical",
"description": str(error_message),
}

response = requests.post(keep_webhook_url, headers=headers, json=data)
response.raise_for_status()

def task_failure_callback(context):
send_alert_to_keep(
dag_id=context["dag"].dag_id,
task_id=context["task_instance"].task_id,
execution_date=context["execution_date"],
error_message=context.get("exception", "Unknown error"),
)

dag = DAG(
dag_id="keep_dag",
default_args=default_args,
description="A simple DAG with Keep integration",
schedule_interval=None,
start_date=datetime(2025, 1, 1),
catchup=False,
)

task = BashOperator(
task_id="keep_task",
bash_command="exit 1",
dag=dag,
on_failure_callback=task_failure_callback,
)
```

### Step 4: Observe Alerts in Keep

After setting up the above configuration, any failure in your Airflow tasks will trigger an alert that is sent to Keep via the configured webhook. You can then view, manage, and respond to these alerts using the Keep dashboard.

![Keep Alerts](/images/airflow_2.png)

## Useful Links

- [Airflow Documentation](https://airflow.apache.org/docs/apache-airflow/stable/index.html)
- [Airflow Callbacks](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/callbacks.html)
- [Airflow Connection](https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html)
6 changes: 6 additions & 0 deletions docs/providers/overview.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ By leveraging Keep Providers, users are able to deeply integrate Keep with the t

<CardGroup cols={3}>

<Card
title="Airflow"
href="/providers/documentation/airflow-provider"
icon={ <img src="https://img.logo.dev/apache.org?token=pk_dfXfZBoKQMGDTIgqu7LvYg" /> }
></Card>

<Card
title="Azure AKS"
href="/providers/documentation/aks-provider"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export const ProvidersCategories = () => {
"Collaboration",
"CRM",
"Queues",
"Orchestration",
"Coming Soon",
"Others",
];
Expand Down
Binary file added keep-ui/public/icons/airflow-icon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions keep-ui/shared/api/providers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ export type TProviderCategory =
| "Collaboration"
| "CRM"
| "Queues"
| "Orchestration"
| "Coming Soon"
| "Others";

Expand Down
Empty file.
111 changes: 111 additions & 0 deletions keep/providers/airflow_provider/airflow_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from datetime import datetime, timezone

from keep.api.models.alert import AlertDto
from keep.contextmanager.contextmanager import ContextManager
from keep.providers.base.base_provider import BaseProvider
from keep.providers.models.provider_config import ProviderConfig


class AirflowProvider(BaseProvider):
"""Enrich alerts with data sent from Airflow."""

PROVIDER_DISPLAY_NAME = "Airflow"
PROVIDER_CATEGORY = ["Orchestration"]
FINGERPRINT_FIELDS = ["fingerprint"]

webhook_description = ""
webhook_template = ""
webhook_markdown = """
πŸ’‘ For more details on configuring Airflow to send alerts to Keep, refer to the [Keep documentation](https://docs.keephq.dev/providers/documentation/airflow-provider).

### 1. Configure Keep's Webhook Credentials
To send alerts to Keep, set up the webhook URL and API key:

- **Keep Webhook URL**: {keep_webhook_api_url}
- **Keep API Key**: {api_key}

### 2. Configure Airflow to Send Alerts to Keep
Airflow uses a callback function to send alerts to Keep. Below is an example configuration:

```python
import os
import requests

def task_failure_callback(context):
# Replace with your specific Keep webhook URL if different.
keep_webhook_url = "{keep_webhook_api_url}"
api_key = "{api_key}"

headers = {{
"Content-Type": "application/json",
"Accept": "application/json",
"X-API-KEY": api_key,
}}

data = {{
"name": f"Airflow Task Failure",
"message": f"Task failed in DAG",
"status": "firing",
"service": "pipeline",
"severity": "critical",
}}

response = requests.post(keep_webhook_url, headers=headers, json=data)
response.raise_for_status()
```

### 3. Attach the Callback to the DAG
Attach the failure callback to the DAG using the `on_failure_callback` parameter:

```python
from airflow import DAG
from datetime import datetime

dag = DAG(
dag_id="keep_dag",
default_args=default_args,
description="A simple DAG with Keep integration",
schedule_interval=None,
start_date=datetime(2025, 1, 1),
catchup=False,
on_failure_callback=task_failure_callback,
)
```
"""

def __init__(
self, context_manager: ContextManager, provider_id: str, config: ProviderConfig
):
super().__init__(context_manager, provider_id, config)

def validate_config(self):
pass

def dispose(self):
"""
No need to dispose of anything, so just do nothing.
"""
pass

@staticmethod
def _format_alert(
event: dict, provider_instance: "BaseProvider" = None
) -> AlertDto:
alert = AlertDto(
id=event.get("fingerprint"),
fingerprint=event.get("fingerprint"),
name=event.get("name", "Airflow Alert"),
message=event.get("message"),
description=event.get("description"),
severity=event.get("severity", "critical"),
status=event.get("status", "firing"),
environment=event.get("environment", "undefined"),
service=event.get("service"),
source=["airflow"],
url=event.get("url"),
lastReceived=event.get(
"lastReceived", datetime.now(tz=timezone.utc).isoformat()
),
labels=event.get("labels", {}),
)
return alert
1 change: 1 addition & 0 deletions keep/providers/base/base_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class BaseProvider(metaclass=abc.ABCMeta):
"Organizational Tools",
"CRM",
"Queues",
"Orchestration",
"Others",
]
] = [
Expand Down
Loading