-
-
Notifications
You must be signed in to change notification settings - Fork 191
Description
First of all, thanks for the superb library @samuelcolvin & team! After trying (unsuccessfully) to setup async task processing celery I finally decided to switch to Arq, especially given the team behind.
While Arq handles async workloads natively, it still doesn't natively support Pydantic serialization of task inputs / outputs. Given the ubiquity of Pydantic in modern Python applications, I wanted to explore whether there’s interest in introducing native support for this.
Problem
Currently, passing Pydantic models to Arq tasks requires handling serialization/deserialization manually:
# Define model
class Document(BaseModel):
document_id: UUID
content: str
# Task with manual serialization
async def process_document_task(ctx: WorkerContext, document_dict: dict) -> dict:
document = Document.model_validate(document_dict)
result = await process_document(document)
return result.model_dump()
# Enqueue task (manual conversion required)
document = Document(document_id=uuid4(), content="Hello")
await redis.enqueue_job('process_document_task', document_dict=document.model_dump(mode="json"))
This approach has several downsides:
- Requires explicit serialization/deserialization everywhere
- Leads to loss of type hints and auto-completion benefits
- Can be error-prone if conversions are forgotten
- JSON serialization does not support UUIDs natively, requiring additional conversion
- Pickle, while supporting more Python types, is insecure when deserializing untrusted data and is not interoperable with non-Python consumers
Proposed Solution
To improve this, I implemented a Msgpack-based serializer that:
- Supports Pydantic models, lists/dicts of models, and UUIDs
- Automatically converts models during task enqueueing and execution
- Uses dynamic class loading to reconstruct Pydantic models at runtime
Why Msgpack?
- Msgpack is compact and faster than JSON when dealing with structured data.
- Unlike Pickle, Msgpack does not execute arbitrary code during deserialization, making it safer.
- Supports dicts, lists, and other common Python structures without needing extensive manual handling.
The core of the solution involves:
- Adding normalization before serialization
- supports Pydantic's BaseModel, UUIDs, lists, tuples, dicts as the most common objects requiring additional handling with msgpack
- Adding denormalization after deserialization
- Pydantic models are reconstructed dynamically from their fully qualified names and UUIDs are converted back to their native type.
Example usage with this approach:
async def process_document_task(ctx: WorkerContext, document: Document) -> Document:
result = await process_document(document)
return result
# No need for manual conversion
document = Document(document_id=uuid4(), content="Hello")
await redis.enqueue_job('process_document_task', document=document)
Implementation Summary
I implemented a MsgpackSerializer class that seamlessly integrates into Arq’s WorkerSettings as job_serializer and job_deserializer.
Here’s a simplified version:
class MsgpackSerializer:
def _normalize(self, obj: Any) -> Any:
if isinstance(obj, BaseModel):
return {"__pydantic__": f"{obj.__class__.__module__}.{obj.__class__.__qualname__}", "data": obj.model_dump(mode="json")}
elif isinstance(obj, UUID):
return {"__uuid__": str(obj)}
elif isinstance(obj, (list, tuple)):
return [self._normalize(item) for item in obj]
elif isinstance(obj, dict):
return {str(key): self._normalize(value) for key, value in obj.items()}
return obj
def _denormalize(self, obj: Any) -> Any:
if isinstance(obj, dict):
if "__pydantic__" in obj:
model_cls = get_model_class(obj["__pydantic__"])
return model_cls.model_validate(obj["data"])
elif "__uuid__" in obj:
return UUID(obj["__uuid__"])
return {key: self._denormalize(value) for key, value in obj.items()}
elif isinstance(obj, list):
return [self._denormalize(item) for item in obj]
return obj
# This is then integrated into Arq’s settings as follows:
class WorkerSettings:
job_serializer = serialize
job_deserializer = deserialize
Questions for the Arq Team
- Would you be open to adding native support for Pydantic serialization in Arq?
- If so, what would be the best approach to contribute this? Should it be a built-in feature, or is a plugin-based system preferred?
- Do you have any concerns regarding this approach? For example, performance trade-offs due to dynamic model loading or security considerations when deserializing user-controlled inputs?
I’d love to get your thoughts on whether this aligns with Arq’s roadmap and how best to adapt my implementation to fit natively.
Thanks again for the great work on Arq!
P.S. I wrote an article provide a bit more detailed overview of the solution + it's limitations here