-
Notifications
You must be signed in to change notification settings - Fork 22
enhancement: allow control over JSON serialization, nested_asyncio, and Context class #12
Description
Hi, I would like to suggest a few enhancements to the sdk.
Version: 0.23.0
In my own workflow library, I've had to implement a few patches in order to overcome the existing behavior of the sdk.
JSON Serialization
If there was a way to allow for a custom JSON serialization/deserialization method, it would be extremely useful to handle the input, especially with object hooks. It's very easy to forget a custom datetime.datetime
object, and if the ser/deser hook can handle those inputs, it would allow for more flexibility for the end-user.
I currently patch the behavior of the following:
hatchet_sdk.worker.Worker
hatchet_sdk.context.Context
hatchet_sdk.context.ChildWorkflowRef
nest_asyncio
My use case is that I would like to spawn multiple workflows, each with their own workers at the same time.
Due to nest_asyncio
, this causes potential problems with downstream libraries that rely on the same EventLoop
, which is a problem, depending on when hatchet_sdk
is imported and nest_asyncio.apply()
occurs.
- Task <Task pending name='Task-414' coro=<BaseComponent.ahandle_data_one_iter() running at .../types/component/base.py:482> cb=[_wait.<locals>._on_completion() at .../lib/python3.10/asyncio/tasks.py:475]> got Future <Future pending> attached to a different loop
It would be helpful if I was able to disable nest_asyncio.apply()
with an env var, understanding the implications of doing so.
Context Class
It would be very useful to allow for the user to provide a custom Context
class that gets used to create Context
within workflow steps by the Worker
. This would enable the user to add properties or methods.
Currently, I have it implemented as:
class Worker:
"""
The Worker Object
"""
def __init__(
self,
name: str,
max_runs: int | None = None,
debug: bool = False,
handle_kill: bool = True,
config: ClientConfig = None,
context_class: Type[Context] = Context,
):
...
self._context_cls = context_class
def handle_start_step_run(self, action: Action):
"""
Handles the start step run action
"""
action_name = action.action_id
context = self._context_cls(action, self.client)
...
def handle_start_group_key_run(self, action: Action):
"""
Handles the start group key run action
"""
action_name = action.action_id
context = self._context_cls(action, self.client)
....
Thanks!