Skip to content

Commit

Permalink
fix: do not break when parsing headers from non-json messages
Browse files Browse the repository at this point in the history
  • Loading branch information
joaodaher committed Apr 12, 2023
1 parent 23647c0 commit 33e2931
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 6 deletions.
7 changes: 6 additions & 1 deletion django_cloud_tasks/middleware/pubsub_headers_middleware.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from json import JSONDecodeError
from typing import Any

from django.apps import apps
Expand Down Expand Up @@ -41,7 +42,11 @@ def is_subscriber_route(self, request) -> bool:
return request.path == expected_url

def extract_headers(self, request) -> dict[str, Any]:
message = Message.load(body=request.body)
try:
message = Message.load(body=request.body)
except JSONDecodeError:
logger.warning("Message received through PubSub is not a valid JSON. Ignoring PubSub headers feature.")
return {}

headers = {}
for key, value in message.attributes.items():
Expand Down
8 changes: 8 additions & 0 deletions django_cloud_tasks/tasks/subscriber_task.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import abc
import json
from urllib.parse import urljoin
from typing import Callable

from cachetools.func import lru_cache
from django.urls import reverse
Expand All @@ -21,6 +23,12 @@ class SubscriberTask(Task, abc.ABC):
def run(self, content: dict, attributes: dict[str, str] | None = None):
raise NotImplementedError()

@classmethod
def message_parser(cls) -> Callable:
# The callable used to parse the message content
# By default, we handle JSON messages
return json.loads

@classmethod
def set_up(cls):
return cls._get_subscriber_client().create_or_update_subscription(
Expand Down
8 changes: 4 additions & 4 deletions django_cloud_tasks/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def post(self, request, task_name, *args, **kwargs):
result = {"error": f"Task {task_name} not found"}
return JsonResponse(status=404, data=result)

task_kwargs = self.parse_input(request=request)
task_kwargs = self.parse_input(request=request, task_class=task_class)
task_metadata = self.parse_metadata(request=request)
try:
output = self.execute_task(task_class=task_class, task_metadata=task_metadata, task_kwargs=task_kwargs)
Expand All @@ -39,7 +39,7 @@ def get_task(self, name: str) -> Type[Task]:
def execute_task(self, task_class: type[Task], task_metadata: TaskMetadata, task_kwargs: dict) -> Any:
return task_class(metadata=task_metadata).process(**task_kwargs)

def parse_input(self, request) -> dict:
def parse_input(self, request, task_class: Type[Task]) -> dict:
return deserialize(value=request.body)

def parse_metadata(self, request) -> TaskMetadata:
Expand All @@ -48,8 +48,8 @@ def parse_metadata(self, request) -> TaskMetadata:

# More info: https://cloud.google.com/pubsub/docs/push#receiving_messages
class GoogleCloudSubscribeView(GoogleCloudTaskView):
def parse_input(self, request) -> dict:
message = Message.load(body=request.body)
def parse_input(self, request, task_class: Type[SubscriberTask]) -> dict:
message = Message.load(body=request.body, parser=task_class.message_parser())
return {
"content": message.data,
"attributes": message.attributes,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "django-google-cloud-tasks"
version = "2.1.4"
version = "2.1.5"
description = "Async Tasks with HTTP endpoints"
authors = ["Joao Daher <[email protected]>"]
packages = [
Expand Down

0 comments on commit 33e2931

Please sign in to comment.