Skip to content

Commit

Permalink
Define task prefix from settings
Browse files Browse the repository at this point in the history
  • Loading branch information
joaodaher committed Feb 8, 2021
1 parent 08f97ff commit 7a4f217
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 39 deletions.
7 changes: 4 additions & 3 deletions django_cloud_tasks/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(self, *args, **kwargs):
self.periodic_tasks = {}
self.subscriber_tasks = {}
self.domain = self._fetch_config(name='GOOGLE_CLOUD_TASKS_ENDPOINT', default='http://localhost:8080')
self.app_name = self._fetch_config(name='GOOGLE_CLOUD_TASKS_APP_NAME', default=os.environ.get('APP_NAME', None))

def _fetch_config(self, name, default):
return getattr(settings, name, os.environ.get(name, default))
Expand All @@ -35,16 +36,16 @@ def register_task(self, task_class):
container[task_class.name()] = task_class
break

def schedule_tasks(self, delete_by_prefix: str = None) -> Tuple[List[str], List[str]]:
def schedule_tasks(self) -> Tuple[List[str], List[str]]:
updated = []
removed = []
for task_name, task_klass in self.periodic_tasks.items():
task_klass().delay()
updated.append(task_name)

if delete_by_prefix:
if self.app_name:
client = CloudScheduler()
for job in client.list(prefix=delete_by_prefix):
for job in client.list(prefix=self.app_name):
task_name = job.name.split('/jobs/')[-1]
if task_name not in updated:
client.delete(name=task_name)
Expand Down
10 changes: 1 addition & 9 deletions django_cloud_tasks/management/commands/schedule_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,7 @@ class Command(BaseInitCommand):
action = 'schedule'
name = 'tasks'

def add_arguments(self, parser):
parser.add_argument(
'--prefix',
action='store',
help='Job name prefix to be used to safely detect obsolete jobs',
)

def perform_init(self, app_config, *args, **options) -> List[str]:
prefix = options['prefix']
updated, deleted = app_config.schedule_tasks(delete_by_prefix=prefix)
updated, deleted = app_config.schedule_tasks()

return [f"[+] {name}" for name in updated] + [f"[-] {name}" for name in deleted]
36 changes: 20 additions & 16 deletions django_cloud_tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@

class TaskMeta(type):
def __new__(cls, name, bases, attrs):
app = apps.get_app_config('django_cloud_tasks')
attrs['_app_name'] = app.app_name

klass = type.__new__(cls, name, bases, attrs)
if getattr(klass, 'abstract', False) and 'abstract' not in attrs:
setattr(klass, 'abstract', False) # TODO Removing the attribute would be better
TaskMeta._register_task(task_class=klass)
TaskMeta._register_task(app=app, task_class=klass)
return klass

def __call__(cls, *args, **kwargs):
Expand All @@ -26,8 +29,7 @@ def __call__(cls, *args, **kwargs):
raise NotImplementedError(f"Do not instantiate a {cls.__name__}. Inherit and create your own.")

@staticmethod
def _register_task(task_class):
app = apps.get_app_config('django_cloud_tasks')
def _register_task(app, task_class):
if task_class.__name__ not in ['Task', 'PeriodicTask', 'SubscriberTask']:
app.register_task(task_class=task_class)

Expand Down Expand Up @@ -64,7 +66,7 @@ def name(cls):

@property
def queue(self):
return 'tasks'
return self._app_name or 'tasks'

@classmethod
def url(cls):
Expand Down Expand Up @@ -100,24 +102,20 @@ def delay(self, **kwargs):

@property
def schedule_name(self):
if self._app_name:
return f'{self._app_name}-{self.name()}'
return self.name()

@property
def __client(self):
return CloudScheduler()


class PubSubTaskMixin:
class SubscriberTask(Task):
abstract = True
_use_oidc_auth = True
_url_name = 'subscriptions-endpoint'

@property
@abstractmethod
def topic_name(self):
raise NotImplementedError()


class SubscriberTask(PubSubTaskMixin, Task):
@abstractmethod
def run(self, message, attributes):
raise NotImplementedError()
Expand All @@ -138,7 +136,7 @@ def topic_name(self):

@property
def subscription_name(self):
return self.name()
return self._app_name or self.name()

@property
def __client(self):
Expand All @@ -152,7 +150,7 @@ def run(self, topic_name: str, message: Dict, attributes: Dict[str, str] = None)
return run_coroutine(
handler=self.__client.publish,
message=json.dumps(message),
topic_id=topic_name,
topic_id=self._full_topic_name(name=topic_name),
attributes=attributes,
)

Expand All @@ -163,15 +161,21 @@ def delay(self, topic_name: str, message: Dict, attributes: Dict[str, str] = Non
# - receiving it through the endpoint
# - and the finally publishing to PubSub
# might be useful to use the Cloud Task throttling
return super().delay(topic_name=topic_name, message=message, attributes=attributes)
full_topic_name = self._full_topic_name(name=topic_name)
return super().delay(topic_name=full_topic_name, message=message, attributes=attributes)
return self.run(topic_name=topic_name, message=message, attributes=attributes)

def initialize(self, topic_name):
run_coroutine(
handler=self.__client.create_topic,
topic_id=topic_name,
topic_id=self._full_topic_name(name=topic_name),
)

def _full_topic_name(self, name):
if self._app_name:
return f'{self._app_name}-{name}'
return name

@property
def __client(self):
return CloudPublisher()
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "django-google-cloud-tasks"
version = "0.3.7"
version = "0.4.0"
description = "Async Tasks with HTTP endpoints"
authors = ["Joao Daher <[email protected]>"]
packages = [
Expand Down
22 changes: 12 additions & 10 deletions sample_project/sample_app/tests/tests_commands.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from io import StringIO
from typing import List
from unittest.mock import patch, Mock
from unittest.mock import patch, Mock, PropertyMock

from django.apps import apps
from django.core.management import call_command
from django.test import SimpleTestCase
from gcp_pilot.mocker import patch_auth
Expand Down Expand Up @@ -67,15 +68,16 @@ def test_schedule_tasks_with_obsolete(self):
'\n- [-] potato_task_2\n'

names = ['potato_task_1', 'potato_task_2']
with self.patch_get_scheduled(names=names):
with self.patch_delete_schedule() as delete:
self._assert_command(
command='schedule_tasks',
params=['--prefix', 'potato'],
expected_schedule_calls=1,
expected_output=expected_output,
)
self.assertEqual(2, delete.call_count)
app_config = apps.get_app_config('django_cloud_tasks')
with patch.object(app_config, 'app_name', new_callable=PropertyMock, return_value='potato'):
with self.patch_get_scheduled(names=names):
with self.patch_delete_schedule() as delete:
self._assert_command(
command='schedule_tasks',
expected_schedule_calls=1,
expected_output=expected_output,
)
self.assertEqual(2, delete.call_count)

def test_initialize_subscribers(self):
expected_output = 'Successfully initialized 1 subscribers to domain http://localhost:8080' \
Expand Down

0 comments on commit 7a4f217

Please sign in to comment.