From 7a4f21703347ceb078c982d4f5bde955a7b2db67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Daher?= Date: Mon, 8 Feb 2021 15:00:52 -0300 Subject: [PATCH] Define task prefix from settings --- django_cloud_tasks/apps.py | 7 ++-- .../management/commands/schedule_tasks.py | 10 +----- django_cloud_tasks/tasks.py | 36 ++++++++++--------- pyproject.toml | 2 +- .../sample_app/tests/tests_commands.py | 22 ++++++------ 5 files changed, 38 insertions(+), 39 deletions(-) diff --git a/django_cloud_tasks/apps.py b/django_cloud_tasks/apps.py index db23648..1d3dfe3 100644 --- a/django_cloud_tasks/apps.py +++ b/django_cloud_tasks/apps.py @@ -18,6 +18,7 @@ def __init__(self, *args, **kwargs): self.periodic_tasks = {} self.subscriber_tasks = {} self.domain = self._fetch_config(name='GOOGLE_CLOUD_TASKS_ENDPOINT', default='http://localhost:8080') + self.app_name = self._fetch_config(name='GOOGLE_CLOUD_TASKS_APP_NAME', default=os.environ.get('APP_NAME', None)) def _fetch_config(self, name, default): return getattr(settings, name, os.environ.get(name, default)) @@ -35,16 +36,16 @@ def register_task(self, task_class): container[task_class.name()] = task_class break - def schedule_tasks(self, delete_by_prefix: str = None) -> Tuple[List[str], List[str]]: + def schedule_tasks(self) -> Tuple[List[str], List[str]]: updated = [] removed = [] for task_name, task_klass in self.periodic_tasks.items(): task_klass().delay() updated.append(task_name) - if delete_by_prefix: + if self.app_name: client = CloudScheduler() - for job in client.list(prefix=delete_by_prefix): + for job in client.list(prefix=self.app_name): task_name = job.name.split('/jobs/')[-1] if task_name not in updated: client.delete(name=task_name) diff --git a/django_cloud_tasks/management/commands/schedule_tasks.py b/django_cloud_tasks/management/commands/schedule_tasks.py index 010074b..6dae712 100644 --- a/django_cloud_tasks/management/commands/schedule_tasks.py +++ b/django_cloud_tasks/management/commands/schedule_tasks.py @@ -7,15 +7,7 @@ class Command(BaseInitCommand): action = 'schedule' name = 'tasks' - def add_arguments(self, parser): - parser.add_argument( - '--prefix', - action='store', - help='Job name prefix to be used to safely detect obsolete jobs', - ) - def perform_init(self, app_config, *args, **options) -> List[str]: - prefix = options['prefix'] - updated, deleted = app_config.schedule_tasks(delete_by_prefix=prefix) + updated, deleted = app_config.schedule_tasks() return [f"[+] {name}" for name in updated] + [f"[-] {name}" for name in deleted] diff --git a/django_cloud_tasks/tasks.py b/django_cloud_tasks/tasks.py index 9dffba2..0361cdb 100644 --- a/django_cloud_tasks/tasks.py +++ b/django_cloud_tasks/tasks.py @@ -14,10 +14,13 @@ class TaskMeta(type): def __new__(cls, name, bases, attrs): + app = apps.get_app_config('django_cloud_tasks') + attrs['_app_name'] = app.app_name + klass = type.__new__(cls, name, bases, attrs) if getattr(klass, 'abstract', False) and 'abstract' not in attrs: setattr(klass, 'abstract', False) # TODO Removing the attribute would be better - TaskMeta._register_task(task_class=klass) + TaskMeta._register_task(app=app, task_class=klass) return klass def __call__(cls, *args, **kwargs): @@ -26,8 +29,7 @@ def __call__(cls, *args, **kwargs): raise NotImplementedError(f"Do not instantiate a {cls.__name__}. Inherit and create your own.") @staticmethod - def _register_task(task_class): - app = apps.get_app_config('django_cloud_tasks') + def _register_task(app, task_class): if task_class.__name__ not in ['Task', 'PeriodicTask', 'SubscriberTask']: app.register_task(task_class=task_class) @@ -64,7 +66,7 @@ def name(cls): @property def queue(self): - return 'tasks' + return self._app_name or 'tasks' @classmethod def url(cls): @@ -100,6 +102,8 @@ def delay(self, **kwargs): @property def schedule_name(self): + if self._app_name: + return f'{self._app_name}-{self.name()}' return self.name() @property @@ -107,17 +111,11 @@ def __client(self): return CloudScheduler() -class PubSubTaskMixin: +class SubscriberTask(Task): + abstract = True _use_oidc_auth = True _url_name = 'subscriptions-endpoint' - @property - @abstractmethod - def topic_name(self): - raise NotImplementedError() - - -class SubscriberTask(PubSubTaskMixin, Task): @abstractmethod def run(self, message, attributes): raise NotImplementedError() @@ -138,7 +136,7 @@ def topic_name(self): @property def subscription_name(self): - return self.name() + return self._app_name or self.name() @property def __client(self): @@ -152,7 +150,7 @@ def run(self, topic_name: str, message: Dict, attributes: Dict[str, str] = None) return run_coroutine( handler=self.__client.publish, message=json.dumps(message), - topic_id=topic_name, + topic_id=self._full_topic_name(name=topic_name), attributes=attributes, ) @@ -163,15 +161,21 @@ def delay(self, topic_name: str, message: Dict, attributes: Dict[str, str] = Non # - receiving it through the endpoint # - and the finally publishing to PubSub # might be useful to use the Cloud Task throttling - return super().delay(topic_name=topic_name, message=message, attributes=attributes) + full_topic_name = self._full_topic_name(name=topic_name) + return super().delay(topic_name=full_topic_name, message=message, attributes=attributes) return self.run(topic_name=topic_name, message=message, attributes=attributes) def initialize(self, topic_name): run_coroutine( handler=self.__client.create_topic, - topic_id=topic_name, + topic_id=self._full_topic_name(name=topic_name), ) + def _full_topic_name(self, name): + if self._app_name: + return f'{self._app_name}-{name}' + return name + @property def __client(self): return CloudPublisher() diff --git a/pyproject.toml b/pyproject.toml index b3a2f8a..1bca9a0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "django-google-cloud-tasks" -version = "0.3.7" +version = "0.4.0" description = "Async Tasks with HTTP endpoints" authors = ["Joao Daher "] packages = [ diff --git a/sample_project/sample_app/tests/tests_commands.py b/sample_project/sample_app/tests/tests_commands.py index d202bc7..679da80 100644 --- a/sample_project/sample_app/tests/tests_commands.py +++ b/sample_project/sample_app/tests/tests_commands.py @@ -1,7 +1,8 @@ from io import StringIO from typing import List -from unittest.mock import patch, Mock +from unittest.mock import patch, Mock, PropertyMock +from django.apps import apps from django.core.management import call_command from django.test import SimpleTestCase from gcp_pilot.mocker import patch_auth @@ -67,15 +68,16 @@ def test_schedule_tasks_with_obsolete(self): '\n- [-] potato_task_2\n' names = ['potato_task_1', 'potato_task_2'] - with self.patch_get_scheduled(names=names): - with self.patch_delete_schedule() as delete: - self._assert_command( - command='schedule_tasks', - params=['--prefix', 'potato'], - expected_schedule_calls=1, - expected_output=expected_output, - ) - self.assertEqual(2, delete.call_count) + app_config = apps.get_app_config('django_cloud_tasks') + with patch.object(app_config, 'app_name', new_callable=PropertyMock, return_value='potato'): + with self.patch_get_scheduled(names=names): + with self.patch_delete_schedule() as delete: + self._assert_command( + command='schedule_tasks', + expected_schedule_calls=1, + expected_output=expected_output, + ) + self.assertEqual(2, delete.call_count) def test_initialize_subscribers(self): expected_output = 'Successfully initialized 1 subscribers to domain http://localhost:8080' \