diff --git a/airflow_prometheus_exporter/prometheus_exporter.py b/airflow_prometheus_exporter/prometheus_exporter.py index 232d458..c9813e3 100644 --- a/airflow_prometheus_exporter/prometheus_exporter.py +++ b/airflow_prometheus_exporter/prometheus_exporter.py @@ -6,7 +6,7 @@ from airflow.configuration import conf from airflow.models import DagModel, DagRun, TaskInstance, TaskFail, XCom from airflow.plugins_manager import AirflowPlugin -from airflow.settings import RBAC, Session +from airflow.settings import Session from airflow.utils.state import State from airflow.utils.log.logging_mixin import LoggingMixin from flask import Response @@ -18,6 +18,7 @@ from airflow_prometheus_exporter.xcom_config import load_xcom_config CANARY_DAG = "canary_dag" +RBAC = "True" @contextmanager @@ -260,7 +261,10 @@ def get_task_duration_info(): TaskInstance.task_id, TaskInstance.start_date, TaskInstance.end_date, - TaskInstance.execution_date, + DagRun.execution_date, + ) + .join( + DagRun, TaskInstance.run_id == DagRun.run_id ) .join( max_execution_dt_query, @@ -318,7 +322,7 @@ def get_task_scheduler_delay(): return ( session.query( task_status_query.c.queue, - TaskInstance.execution_date, + DagRun.execution_date, TaskInstance.queued_dttm, task_status_query.c.max_start.label("start_date"), ) @@ -329,6 +333,9 @@ def get_task_scheduler_delay(): TaskInstance.start_date == task_status_query.c.max_start, ), ) + .join( + DagRun, TaskInstance.run_id == DagRun.run_id + ) .filter( TaskInstance.dag_id == CANARY_DAG, # Redundant, for performance.