Skip to content

Commit

Permalink
refactor: change set message with header to immutable
Browse files Browse the repository at this point in the history
  • Loading branch information
diegodfreire committed Aug 10, 2023
1 parent e0dab73 commit 82442aa
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions django_cloud_tasks/tasks/publisher_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,20 @@ def asap(cls, message: dict, attributes: dict[str, str] | None = None):

def run(self, message: dict, attributes: dict[str, str] | None = None, headers: dict[str, str] | None = None):
# Cloud PubSub does not support headers, but we simulate them with a key in the data property
self._set_message_headers(message=message, headers=headers)
message = self._build_message_with_headers(message=message, headers=headers)

return self._get_publisher_client().publish(
message=serialize(value=message),
topic_id=self.topic_name(),
attributes=attributes,
)

def _set_message_headers(self, message: dict, headers: dict | None = None):
def _build_message_with_headers(self, message: dict, headers: dict | None = None):
message = message.copy()
headers = get_current_headers() | (headers or {})
if headers:
message[self._app.propagated_headers_key] = headers
return message

@classmethod
def set_up(cls) -> None:
Expand Down Expand Up @@ -89,7 +91,7 @@ def asap(cls, obj: Model, **kwargs):
def run(
self, message: dict, topic_name: str, attributes: dict[str, str] | None, headers: dict[str, str] | None = None
):
self._set_message_headers(message=message, headers=headers)
message = self._build_message_with_headers(message=message, headers=headers)
return self._get_publisher_client().publish(
message=serialize(value=message),
topic_id=topic_name,
Expand Down

0 comments on commit 82442aa

Please sign in to comment.