Skip to content

Scheduler pod crash because of Deadline set on DAG #58560

@Ferdinanddb

Description

@Ferdinanddb

Apache Airflow version

3.1.3

If "Other Airflow 2/3 version" selected, which one?

No response

What happened?

I have two DAGs using the new deadline alerts feature. I did a simple test before setting the deadline to these DAGs, and it worked fine.

Here is an example of a DAG I have with deadline alert:

with DAG(
    dag_id=Path(__file__).parent.name,
    dag_display_name="🟡 " + ICEBERG_FULL_TABLE_REF,
    schedule="0 0 * * *",
    default_args=default_args,
    tags={
        "some",
        "tags",
    },
    max_active_runs=1,
    catchup=False,
    deadline=DeadlineAlert(
        reference=DeadlineReference.DAGRUN_QUEUED_AT,
        interval=timedelta(hours=10),
        callback=AsyncCallback(
            callback_callable=SlackWebhookNotifier,
            kwargs={
                "slack_webhook_conn_id": "slack_default",
                "text": "some text",
            },
        ),
    ),
    on_failure_callback=default_dag_failure_slack_webhook_notification,
) as dag:
    # some Airflow tasks

The scheduler pod is crashing (CrashLoopBackoff state) with the following error:

2025-11-21T08:59:50.012752Z [info     ] Exited execute loop            [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:1094
Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 7, in <module>
    sys.exit(main())
             ^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line 55, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 114, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py", line 52, in scheduler
    run_command_with_daemon_option(
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
    callback()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py", line 55, in <lambda>
    callback=lambda: _run_scheduler_job(args),
                     ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py", line 43, in _run_scheduler_job
    run_job(job=job_runner.job, execute_callable=job_runner._execute)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 100, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 368, in run_job
    return execute_job(job, execute_callable=execute_callable)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 397, in execute_job
    ret = execute_callable()
          ^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 1078, in _execute
    self._run_scheduler_loop()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 1368, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 1478, in _do_scheduling
    callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/retries.py", line 97, in wrapped_function
    for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs):
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 378, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 400, in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
                                     ^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/retries.py", line 106, in wrapped_function
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 1888, in _schedule_all_dag_runs
    callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs]
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 2004, in _schedule_dag_run
    schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
                                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 98, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/dagrun.py", line 1231, in update_state
    Deadline.prune_deadlines(session=session, conditions={DagRun.run_id: self.run_id})
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/deadline.py", line 182, in prune_deadlines
    if dagrun.end_date <= deadline.deadline_time:
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: '<=' not supported between instances of 'NoneType' and 'datetime.datetime'
stream closed: EOF for airflow/airflow-scheduler-67599768b9-7d9pj (scheduler)

The only way I found so far to resolve the situation is to comment out the Deadline logic (so to not have it anymore) in the DAG.

What you think should happen instead?

The scheduler pod should not crash. This is a bit dangerous as one DAG can have an impact on all the other DAGs in the sense that one DAG is causing the scheduler to crash, preventing the other DAGs to be scheduled.

How to reproduce

I don't really know, but my DAG is basic:

from datetime import datetime, timedelta
from pathlib import Path

from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator

from airflow.providers.slack.notifications.slack_webhook import SlackWebhookNotifier
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
from airflow.sdk import DAG, Asset, TaskGroup

from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineAlert, DeadlineReference
from airflow_richfox.custom.utils.gcs import upload_spark_config_to_gcs
from airflow_richfox.custom.utils.slack import (
    deadline_exceeded_slack_text_message,
    default_dag_failure_slack_webhook_notification,
)

ICEBERG_CATALOG_NAME = "biglakeCatalog"
ICEBERG_TABLE_REF = "gold.some_table"
ICEBERG_FULL_TABLE_REF = f"{ICEBERG_CATALOG_NAME}.{ICEBERG_TABLE_REF}"

# Define the Asset for this DAG
asset_table_gold_algoseek_sec_master_full_listed_current = Asset(f"x-iceberg://{ICEBERG_FULL_TABLE_REF}")

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 4,
    "start_date": datetime(2024, 4, 29),
    "retry_delay": timedelta(minutes=4),
}

# Update your DAG to use parallel processing
with DAG(
    dag_id=Path(__file__).parent.name,
    dag_display_name="🟡 " + ICEBERG_FULL_TABLE_REF,
    schedule="0 0 * * *",
    default_args=default_args,
    tags={
        "some",
        "gold",
        "tag",
    },
    max_active_runs=1,
    catchup=False,
    deadline=DeadlineAlert(
        reference=DeadlineReference.DAGRUN_QUEUED_AT,
        interval=timedelta(hours=10),
        callback=AsyncCallback(
            callback_callable=SlackWebhookNotifier,
            kwargs={
                "slack_webhook_conn_id": "slack_default",
                "text": deadline_exceeded_slack_text_message,
            },
        ),
    ),
    on_failure_callback=default_dag_failure_slack_webhook_notification,
) as dag:
    with TaskGroup("wait_for_source_silver_dags") as wait_for_source_silver_dags:
        for source_dagname in [
            "silver_dag1",
            "silver_dag2",
            "silver_dag3",
            "silver_dag4",
            "silver_dag5",
            "silver_dag6",
        ]:
            wait_for_dag = ExternalTaskSensor(
                task_id=f"wait_for_dag_{source_dagname}",
                external_dag_id=source_dagname,
                timeout=60 * 60 * 12,  # 12 hours
                poll_interval=60 * 3.0,  # 3 minutes
                deferrable=True,
                soft_fail=False,
                execution_date_fn=lambda dt: dt,
            )

    full_reload_job = SparkKubernetesOperator(
        task_id="full_reload_job",
        namespace="spark-operator",
        application_file="spark_app/full_reload_job/spark_application_config.yml",
        kubernetes_conn_id="kubernetes_default",
        random_name_suffix=True,
        get_logs=True,
        reattach_on_restart=True,
        delete_on_termination=False,
        do_xcom_push=False,
        deferrable=True,
        base_container_status_polling_interval=60,
        on_execute_callback=upload_spark_config_to_gcs,
        outlets=[asset_table_gold_algoseek_sec_master_full_listed_current],
    )

    maintenance_job = SparkKubernetesOperator(
        task_id="maintenance_job",
        namespace="spark-operator",
        application_file="spark_app/maintenance_job/spark_application_config.yml",
        kubernetes_conn_id="kubernetes_default",
        random_name_suffix=True,
        get_logs=True,
        reattach_on_restart=True,
        delete_on_termination=False,
        do_xcom_push=False,
        deferrable=True,
        on_execute_callback=upload_spark_config_to_gcs,
    )

    wait_for_source_silver_dags >> full_reload_job >> maintenance_job

Operating System

Official Airflow image: docker.io/apache/airflow:3.1.1-python3.12

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

I use the official constraint file to install dependencies.

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

area:Schedulerincluding HA (high availability) schedulerarea:corekind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yet

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions