Skip to content

Commit

Permalink
fix: Storage Queue message format breaking change
Browse files Browse the repository at this point in the history
See: f0748d4
  • Loading branch information
clemlesne committed Aug 21, 2024
1 parent c275ed6 commit 46f70b1
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 11 deletions.
26 changes: 21 additions & 5 deletions app/persistence/azure_queue_storage.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from base64 import b64decode, b64encode
from binascii import Error as BinasciiError
from typing import Any, AsyncGenerator

from azure.core.exceptions import (
ResourceExistsError,
ResourceNotFoundError,
ServiceRequestError,
)
from azure.storage.queue import TextBase64DecodePolicy, TextBase64EncodePolicy
from azure.storage.queue.aio import QueueClient, QueueServiceClient
from pydantic import BaseModel
from tenacity import (
Expand Down Expand Up @@ -47,7 +48,7 @@ async def send_message(
self,
message: str,
) -> None:
await self._client.send_message(message)
await self._client.send_message(self._escape(message))

@retry(
reraise=True,
Expand All @@ -66,7 +67,7 @@ async def receive_messages(
)
async for message in messages:
yield Message(
content=message.content,
content=self._unescape(message.content),
delete_token=message.pop_receipt,
dequeue_count=message.dequeue_count,
message_id=message.id,
Expand Down Expand Up @@ -99,13 +100,28 @@ async def delete_queue(
await self._client.delete_queue()
logger.info('Deleted Queue Storage "%s"', self._config.name)

def _escape(self, value: str) -> str:
"""
Escape value to base64 encoding.
"""
return b64encode(value.encode(self.encoding)).decode(self.encoding)

def _unescape(self, value: str) -> str:
"""
Unescape value from base64 encoding.
If the value is not base64 encoded, return the original value as string. This will handle retro-compatibility with old messages.
"""
try:
return b64decode(value.encode(self.encoding)).decode(self.encoding)
except (UnicodeDecodeError, BinasciiError):
return value

async def __aenter__(self) -> "AzureQueueStorage":
self._service = QueueServiceClient.from_connection_string(
self._config.connection_string
)
self._client = self._service.get_queue_client(
message_decode_policy=TextBase64DecodePolicy(),
message_encode_policy=TextBase64EncodePolicy(),
queue=self._config.name,
)
# Create if it does not exist
Expand Down
2 changes: 2 additions & 0 deletions app/persistence/iqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class Provider(str, Enum):


class IQueue:
encoding = "utf-8"

@abstractmethod
async def send_message(
self,
Expand Down
14 changes: 11 additions & 3 deletions tests/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,20 @@ async def _validate(


def _random_name() -> str:
"""
Generate a random name with 32 characters.
All lowercase letters and digits are used.
"""
return "".join(
random.choice(string.ascii_lowercase + string.digits) for _ in range(32)
)


def _random_content() -> str:
return "".join(
random.choice(string.printable) for _ in range(random.randint(1, 512))
)
"""
Generate a random content with a length of 512 characters.
All printable ASCII characters are used.
"""
return "".join(random.choice(string.printable) for _ in range(512))
14 changes: 11 additions & 3 deletions tests/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,20 @@ async def test_send_many(provider: QueueProvider) -> None:


def _random_name() -> str:
"""
Generate a random name with 32 characters.
All lowercase letters and digits are used.
"""
return "".join(
random.choice(string.ascii_lowercase + string.digits) for _ in range(32)
)


def _random_content() -> str:
return "".join(
random.choice(string.printable) for _ in range(random.randint(1, 512))
)
"""
Generate a random content with a length of 512 characters.
All printable ASCII characters are used.
"""
return "".join(random.choice(string.printable) for _ in range(512))

0 comments on commit 46f70b1

Please sign in to comment.