Skip to content

Commit

Permalink
Fix task id validation in BaseOperator (#44939)
Browse files Browse the repository at this point in the history
* Fix task id validation in BaseOperator

* add additional tests to check task id length

* fix assert statement
  • Loading branch information
gopidesupavan authored Dec 15, 2024
1 parent 9475a23 commit e100d78
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 1 deletion.
2 changes: 1 addition & 1 deletion task_sdk/src/airflow/sdk/definitions/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ def __init__(
f"Invalid arguments were passed to {self.__class__.__name__} (task_id: {task_id}). "
f"Invalid arguments were:\n**kwargs: {kwargs}",
)
validate_key(task_id)
validate_key(self.task_id)

self.owner = owner
self.email = email
Expand Down
27 changes: 27 additions & 0 deletions tests/models/test_baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,33 @@ def test_chain(self):
assert [op2] == tgop3.get_direct_relatives(upstream=False)
assert [op2] == tgop4.get_direct_relatives(upstream=False)

def test_baseoperator_raises_exception_when_task_id_plus_taskgroup_id_exceeds_250_chars(self):
"""Test exception is raised when operator task id + taskgroup id > 250 chars."""
dag = DAG(dag_id="foo", schedule=None, start_date=datetime.now())

tg1 = TaskGroup("A" * 20, dag=dag)
with pytest.raises(ValueError, match="The key has to be less than 250 characters"):
BaseOperator(task_id="1" * 250, task_group=tg1, dag=dag)

def test_baseoperator_with_task_id_and_taskgroup_id_less_than_250_chars(self):
"""Test exception is not raised when operator task id + taskgroup id < 250 chars."""
dag = DAG(dag_id="foo", schedule=None, start_date=datetime.now())

tg1 = TaskGroup("A" * 10, dag=dag)
try:
BaseOperator(task_id="1" * 239, task_group=tg1, dag=dag)
except Exception as e:
pytest.fail(f"Exception raised: {e}")

def test_baseoperator_with_task_id_less_than_250_chars(self):
"""Test exception is not raised when operator task id < 250 chars."""
dag = DAG(dag_id="foo", schedule=None, start_date=datetime.now())

try:
BaseOperator(task_id="1" * 249, dag=dag)
except Exception as e:
pytest.fail(f"Exception raised: {e}")

def test_chain_linear(self):
dag = DAG(dag_id="test_chain_linear", schedule=None, start_date=datetime.now())

Expand Down

0 comments on commit e100d78

Please sign in to comment.