Skip to content

Commit

Permalink
AIP-72: Handling AirflowException in task sdk (#45308)
Browse files Browse the repository at this point in the history
  • Loading branch information
amoghrajesh authored Dec 31, 2024
1 parent 4af49a0 commit 24caa2b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 1 deletion.
10 changes: 9 additions & 1 deletion task_sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,17 @@ def run(ti: RuntimeTaskInstance, log: Logger):
)

# TODO: Run task failure callbacks here
except (AirflowTaskTimeout, AirflowException):
except AirflowTaskTimeout:
# TODO: handle the case of up_for_retry here
# TODO: coagulate this exception handling with AirflowException
# once https://github.com/apache/airflow/issues/45307 is handled
...
except AirflowException:
# TODO: handle the case of up_for_retry here
msg = TaskState(
state=TerminalTIState.FAILED,
end_date=datetime.now(tz=timezone.utc),
)
except AirflowTaskTerminated:
# External state updates are already handled with `ti_heartbeat` and will be
# updated already be another UI API. So, these exceptions should ideally never be thrown.
Expand Down
41 changes: 41 additions & 0 deletions task_sdk/tests/execution_time/test_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from uuid6 import uuid7

from airflow.exceptions import (
AirflowException,
AirflowFailException,
AirflowSensorTimeout,
AirflowSkipException,
Expand Down Expand Up @@ -330,6 +331,46 @@ def test_run_raises_system_exit(time_machine, mocked_parse, make_ti_context, moc
)


def test_run_raises_airflow_exception(time_machine, mocked_parse, make_ti_context, mock_supervisor_comms):
"""Test running a basic task that exits with AirflowException."""
from airflow.providers.standard.operators.python import PythonOperator

task = PythonOperator(
task_id="af_exception_task",
python_callable=lambda: (_ for _ in ()).throw(
AirflowException("Oops! I am failing with AirflowException!"),
),
)

what = StartupDetails(
ti=TaskInstance(
id=uuid7(),
task_id="af_exception_task",
dag_id="basic_dag_af_exception",
run_id="c",
try_number=1,
),
file="",
requests_fd=0,
ti_context=make_ti_context(),
)

ti = mocked_parse(what, "basic_dag_af_exception", task)

instant = timezone.datetime(2024, 12, 3, 10, 0)
time_machine.move_to(instant, tick=False)

run(ti, log=mock.MagicMock())

mock_supervisor_comms.send_request.assert_called_once_with(
msg=TaskState(
state=TerminalTIState.FAILED,
end_date=instant,
),
log=mock.ANY,
)


def test_startup_basic_templated_dag(mocked_parse, make_ti_context, mock_supervisor_comms):
"""Test running a DAG with templated task."""
from airflow.providers.standard.operators.bash import BashOperator
Expand Down

0 comments on commit 24caa2b

Please sign in to comment.