Skip to content

Commit

Permalink
test: organizing test in specific folders
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasgomide committed Jul 12, 2023
1 parent e368691 commit d3c38f0
Show file tree
Hide file tree
Showing 11 changed files with 451 additions and 349 deletions.
11 changes: 10 additions & 1 deletion sample_project/sample_app/tests/tests_base_tasks.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
import inspect
from contextlib import contextmanager
from typing import Callable, Type

from django.test import SimpleTestCase
from django.utils.connection import ConnectionProxy
from gcp_pilot.mocker import patch_auth

LockType = (Callable | Exception | Type[Exception]) | None


class AuthenticationMixin(SimpleTestCase):
def setUp(self) -> None:
auth = patch_auth()
auth.start()
self.addCleanup(auth.stop)
super().setUp()


def patch_cache_lock(
lock_side_effect: LockType = None,
unlock_side_effect: LockType = None,
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from unittest.mock import call, patch

from django.test import TestCase
from django_cloud_tasks.tests import factories


class PipelineModelTest(TestCase):
def test_start_pipeline(self):
pipeline = factories.PipelineFactory()
leaf_already_completed = factories.RoutineWithoutSignalFactory(status="completed")
pipeline.routines.add(leaf_already_completed)

leaf_already_reverted = factories.RoutineWithoutSignalFactory(status="reverted")
pipeline.routines.add(leaf_already_reverted)

with patch("django_cloud_tasks.tasks.RoutineExecutorTask.asap") as task:
with self.assertNumQueries(1):
pipeline.start()
task.assert_not_called()

second_routine = factories.RoutineFactory()
pipeline.routines.add(second_routine)
third_routine = factories.RoutineFactory()
pipeline.routines.add(third_routine)
first_routine = factories.RoutineFactory()
pipeline.routines.add(first_routine)

another_first_routine = factories.RoutineFactory()
pipeline.routines.add(another_first_routine)

factories.RoutineVertexFactory(routine=second_routine, next_routine=third_routine)
factories.RoutineVertexFactory(routine=first_routine, next_routine=second_routine)

with patch("django_cloud_tasks.tasks.RoutineExecutorTask.asap") as task:
with self.assertNumQueries(7):
pipeline.start()
calls = [call(routine_id=first_routine.pk), call(routine_id=another_first_routine.pk)]
task.assert_has_calls(calls, any_order=True)

def test_revert_pipeline(self):
pipeline = factories.PipelineFactory()

leaf_already_reverted = factories.RoutineWithoutSignalFactory(status="reverted")
pipeline.routines.add(leaf_already_reverted)

with patch("django_cloud_tasks.tasks.RoutineReverterTask.asap") as task:
with self.assertNumQueries(1):
pipeline.revert()
task.assert_not_called()

second_routine = factories.RoutineFactory()
pipeline.routines.add(second_routine)

third_routine = factories.RoutineWithoutSignalFactory(status="completed")
pipeline.routines.add(third_routine)

first_routine = factories.RoutineFactory()
pipeline.routines.add(first_routine)

fourth_routine = factories.RoutineWithoutSignalFactory(status="completed")
pipeline.routines.add(fourth_routine)

factories.RoutineVertexFactory(routine=second_routine, next_routine=third_routine)
factories.RoutineVertexFactory(routine=first_routine, next_routine=second_routine)

with patch("django_cloud_tasks.tasks.RoutineReverterTask.asap") as task:
with self.assertNumQueries(7):
pipeline.revert()
calls = [
call(routine_id=fourth_routine.pk),
call(routine_id=third_routine.pk),
]
task.assert_has_calls(calls, any_order=True)

def test_add_routine(self):
pipeline = factories.PipelineFactory()
expected_routine_1 = {
"task_name": "DummyRoutineTask",
"body": {"spell": "wingardium leviosa"},
}
routine = pipeline.add_routine(expected_routine_1)
self.assertEqual(expected_routine_1["body"], routine.body)
self.assertEqual(expected_routine_1["task_name"], routine.task_name)
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
from typing import List
from unittest.mock import call, patch

from django.core.exceptions import ValidationError
from django.test import TestCase
from django.utils import timezone
from freezegun import freeze_time
from django.db import IntegrityError
from django_cloud_tasks import models
from django_cloud_tasks.tests import factories


Expand Down Expand Up @@ -150,142 +148,3 @@ def test_ensure_max_retries_greater_than_attempt_count(self):
expected_exception=IntegrityError, expected_regex="constraint failed: max_retries_less_than_attempt_count"
):
factories.RoutineFactory(max_retries=1, attempt_count=5)


class PipelineModelTest(TestCase):
def test_start_pipeline(self):
pipeline = factories.PipelineFactory()
leaf_already_completed = factories.RoutineWithoutSignalFactory(status="completed")
pipeline.routines.add(leaf_already_completed)

leaf_already_reverted = factories.RoutineWithoutSignalFactory(status="reverted")
pipeline.routines.add(leaf_already_reverted)

with patch("django_cloud_tasks.tasks.RoutineExecutorTask.asap") as task:
with self.assertNumQueries(1):
pipeline.start()
task.assert_not_called()

second_routine = factories.RoutineFactory()
pipeline.routines.add(second_routine)
third_routine = factories.RoutineFactory()
pipeline.routines.add(third_routine)
first_routine = factories.RoutineFactory()
pipeline.routines.add(first_routine)

another_first_routine = factories.RoutineFactory()
pipeline.routines.add(another_first_routine)

factories.RoutineVertexFactory(routine=second_routine, next_routine=third_routine)
factories.RoutineVertexFactory(routine=first_routine, next_routine=second_routine)

with patch("django_cloud_tasks.tasks.RoutineExecutorTask.asap") as task:
with self.assertNumQueries(7):
pipeline.start()
calls = [call(routine_id=first_routine.pk), call(routine_id=another_first_routine.pk)]
task.assert_has_calls(calls, any_order=True)

def test_revert_pipeline(self):
pipeline = factories.PipelineFactory()

leaf_already_reverted = factories.RoutineWithoutSignalFactory(status="reverted")
pipeline.routines.add(leaf_already_reverted)

with patch("django_cloud_tasks.tasks.RoutineReverterTask.asap") as task:
with self.assertNumQueries(1):
pipeline.revert()
task.assert_not_called()

second_routine = factories.RoutineFactory()
pipeline.routines.add(second_routine)

third_routine = factories.RoutineWithoutSignalFactory(status="completed")
pipeline.routines.add(third_routine)

first_routine = factories.RoutineFactory()
pipeline.routines.add(first_routine)

fourth_routine = factories.RoutineWithoutSignalFactory(status="completed")
pipeline.routines.add(fourth_routine)

factories.RoutineVertexFactory(routine=second_routine, next_routine=third_routine)
factories.RoutineVertexFactory(routine=first_routine, next_routine=second_routine)

with patch("django_cloud_tasks.tasks.RoutineReverterTask.asap") as task:
with self.assertNumQueries(7):
pipeline.revert()
calls = [
call(routine_id=fourth_routine.pk),
call(routine_id=third_routine.pk),
]
task.assert_has_calls(calls, any_order=True)

def test_add_routine(self):
pipeline = factories.PipelineFactory()
expected_routine_1 = {
"task_name": "DummyRoutineTask",
"body": {"spell": "wingardium leviosa"},
}
routine = pipeline.add_routine(expected_routine_1)
self.assertEqual(expected_routine_1["body"], routine.body)
self.assertEqual(expected_routine_1["task_name"], routine.task_name)


class RoutineStateMachineTest(TestCase):
def setUp(self):
super().setUp()
revert_routine_task = patch("django_cloud_tasks.tasks.RoutineReverterTask.asap")
routine_task = patch("django_cloud_tasks.tasks.RoutineExecutorTask.asap")
routine_task.start()
revert_routine_task.start()
self.addCleanup(routine_task.stop)
self.addCleanup(revert_routine_task.stop)

def _status_list(self, ignore_items: list) -> list:
statuses = models.Routine.Statuses.values
for item in ignore_items:
statuses.remove(item)
return statuses

def test_dont_allow_initial_status_not_equal_pending(self):
for status in self._status_list(ignore_items=["pending", "failed", "scheduled"]):
msg_error = f"The initial routine's status must be 'pending' not '{status}'"
with self.assertRaises(ValidationError, msg=msg_error):
factories.RoutineFactory(status=status)

def test_ignore_if_status_was_not_updated(self):
routine = factories.RoutineFactory(status="pending")
routine.status = "pending"
routine.save()

def test_allow_to_update_status_from_scheduled_to_running_or_failed(self):
self.assert_machine_status(accepted_status=["running", "failed", "reverting"], from_status="scheduled")

def test_allow_to_update_status_from_running_to_completed(self):
self.assert_machine_status(
accepted_status=["completed", "failed"],
from_status="running",
)

def test_allow_to_update_status_from_completed_to_failed_or_reverting(self):
self.assert_machine_status(accepted_status=["reverting"], from_status="completed")

def test_allow_to_update_status_from_reverting_to_reverted(self):
self.assert_machine_status(
accepted_status=["reverted"],
from_status="reverting",
)

def assert_machine_status(self, from_status: str, accepted_status: List[str]):
for status in accepted_status:
routine = factories.RoutineWithoutSignalFactory(status=from_status)
routine.status = status
routine.save()

accepted_status.append(from_status)
for status in self._status_list(ignore_items=accepted_status):
msg_error = f"Status update from '{from_status}' to '{status}' is not allowed"
with self.assertRaises(ValidationError, msg=msg_error):
routine = factories.RoutineWithoutSignalFactory(status=from_status)
routine.status = status
routine.save()
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import List
from unittest.mock import patch

from django.core.exceptions import ValidationError
from django.test import TestCase
from django_cloud_tasks import models
from django_cloud_tasks.tests import factories


class RoutineStateMachineTest(TestCase):
def setUp(self):
super().setUp()
revert_routine_task = patch("django_cloud_tasks.tasks.RoutineReverterTask.asap")
routine_task = patch("django_cloud_tasks.tasks.RoutineExecutorTask.asap")
routine_task.start()
revert_routine_task.start()
self.addCleanup(routine_task.stop)
self.addCleanup(revert_routine_task.stop)

def _status_list(self, ignore_items: list) -> list:
statuses = models.Routine.Statuses.values
for item in ignore_items:
statuses.remove(item)
return statuses

def test_dont_allow_initial_status_not_equal_pending(self):
for status in self._status_list(ignore_items=["pending", "failed", "scheduled"]):
msg_error = f"The initial routine's status must be 'pending' not '{status}'"
with self.assertRaises(ValidationError, msg=msg_error):
factories.RoutineFactory(status=status)

def test_ignore_if_status_was_not_updated(self):
routine = factories.RoutineFactory(status="pending")
routine.status = "pending"
routine.save()

def test_allow_to_update_status_from_scheduled_to_running_or_failed(self):
self.assert_machine_status(accepted_status=["running", "failed", "reverting"], from_status="scheduled")

def test_allow_to_update_status_from_running_to_completed(self):
self.assert_machine_status(
accepted_status=["completed", "failed"],
from_status="running",
)

def test_allow_to_update_status_from_completed_to_failed_or_reverting(self):
self.assert_machine_status(accepted_status=["reverting"], from_status="completed")

def test_allow_to_update_status_from_reverting_to_reverted(self):
self.assert_machine_status(
accepted_status=["reverted"],
from_status="reverting",
)

def assert_machine_status(self, from_status: str, accepted_status: List[str]):
for status in accepted_status:
routine = factories.RoutineWithoutSignalFactory(status=from_status)
routine.status = status
routine.save()

accepted_status.append(from_status)
for status in self._status_list(ignore_items=accepted_status):
msg_error = f"Status update from '{from_status}' to '{status}' is not allowed"
with self.assertRaises(ValidationError, msg=msg_error):
routine = factories.RoutineWithoutSignalFactory(status=from_status)
routine.status = status
routine.save()
Loading

0 comments on commit d3c38f0

Please sign in to comment.