From 177b088451adb1c9acc7883190b1743fe71ca730 Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Thu, 30 May 2024 21:30:23 -0400 Subject: [PATCH 1/3] fix: admin aio client config --- hatchet_sdk/clients/admin.py | 1 + pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/hatchet_sdk/clients/admin.py b/hatchet_sdk/clients/admin.py index c94dedf6..c2d47d76 100644 --- a/hatchet_sdk/clients/admin.py +++ b/hatchet_sdk/clients/admin.py @@ -122,6 +122,7 @@ def _prepare_schedule_workflow_request( class AdminClientAioImpl(AdminClientBase): def __init__(self, config: ClientConfig): aio_conn = new_conn(config, True) + self.config = config self.aio_client = WorkflowServiceStub(aio_conn) self.token = config.token self.listener_client = new_listener(config) diff --git a/pyproject.toml b/pyproject.toml index 892664ac..8edbe434 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "hatchet-sdk" -version = "0.25.1" +version = "0.25.2" description = "" authors = ["Alexander Belanger "] readme = "README.md" From 185927bec8103ea59ae7eef0bd9484cda308d17f Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Thu, 30 May 2024 21:58:12 -0400 Subject: [PATCH 2/3] fix: broken workflow listener --- hatchet_sdk/context.py | 15 +++++++++------ hatchet_sdk/worker.py | 4 ++++ hatchet_sdk/workflow_run.py | 4 ++-- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/hatchet_sdk/context.py b/hatchet_sdk/context.py index 151f5449..3f019947 100644 --- a/hatchet_sdk/context.py +++ b/hatchet_sdk/context.py @@ -48,6 +48,7 @@ def __init__( admin_client: AdminClientImpl, event_client: EventClientImpl, workflow_listener: PooledWorkflowRunListener, + workflow_run_event_listener: RunEventListenerClient, namespace: str = "", ): self.action = action @@ -55,18 +56,17 @@ def __init__( self.admin_client = admin_client self.event_client = event_client self.workflow_listener = workflow_listener + self.workflow_run_event_listener = workflow_run_event_listener self.namespace = namespace self.spawn_index = -1 async def spawn_workflow( self, workflow_name: str, input: dict = {}, key: str = None - ): + ) -> WorkflowRunRef: options = self._prepare_workflow_options(key) - child_workflow_run_id = await self.admin_client.aio.run_workflow( + return await self.admin_client.aio.run_workflow( workflow_name, input, options ) - return WorkflowRunRef(child_workflow_run_id, self.workflow_listener) - class Context(BaseContext): spawn_index = -1 @@ -78,6 +78,7 @@ def __init__( admin_client: AdminClientImpl, event_client: EventClientImpl, workflow_listener: PooledWorkflowRunListener, + workflow_run_event_listener: RunEventListenerClient, namespace: str = "", ): self.aio = ContextAioImpl( @@ -86,6 +87,7 @@ def __init__( admin_client, event_client, workflow_listener, + workflow_run_event_listener, namespace, ) @@ -110,6 +112,7 @@ def __init__( self.admin_client = admin_client self.event_client = event_client self.workflow_listener = workflow_listener + self.workflow_run_event_listener = workflow_run_event_listener self.namespace = namespace # FIXME: this limits the number of concurrent log requests to 1, which means we can do about @@ -171,10 +174,10 @@ def spawn_workflow(self, workflow_name: str, input: dict = {}, key: str = None): workflow_name = f"{self.namespace}{workflow_name}" options = self._prepare_workflow_options(key) - child_workflow_run_id = self.admin_client.run_workflow( + + return self.admin_client.run_workflow( workflow_name, input, options ) - return WorkflowRunRef(child_workflow_run_id, self.workflow_listener) def _log(self, line: str): try: diff --git a/hatchet_sdk/worker.py b/hatchet_sdk/worker.py index 81f57afe..811ec815 100644 --- a/hatchet_sdk/worker.py +++ b/hatchet_sdk/worker.py @@ -16,6 +16,7 @@ from google.protobuf.timestamp_pb2 import Timestamp from hatchet_sdk.clients.admin import new_admin +from hatchet_sdk.clients.run_event_listener import new_listener from hatchet_sdk.clients.workflow_listener import PooledWorkflowRunListener from hatchet_sdk.loader import ClientConfig @@ -153,6 +154,7 @@ async def handle_start_step_run(self, action: Action): self.admin_client, self.client.event, self.client.workflow_listener, + self.workflow_run_event_listener, self.client.config.namespace, ) self.contexts[action.step_run_id] = context @@ -193,6 +195,7 @@ async def handle_start_group_key_run(self, action: Action): self.admin_client, self.client.event, self.client.workflow_listener, + self.workflow_run_event_listener, self.client.config.namespace, ) @@ -482,6 +485,7 @@ async def async_start(self, retry_count=1): # otherwise the grpc.aio methods will use a different event loop and we'll get a bunch of errors. self.dispatcher_client = new_dispatcher(self.config) self.admin_client = new_admin(self.config) + self.workflow_run_event_listener = new_listener(self.config) self.client.workflow_listener = PooledWorkflowRunListener(self.config) self.listener: ActionListenerImpl = ( diff --git a/hatchet_sdk/workflow_run.py b/hatchet_sdk/workflow_run.py index 02da060d..e038a1c5 100644 --- a/hatchet_sdk/workflow_run.py +++ b/hatchet_sdk/workflow_run.py @@ -24,5 +24,5 @@ def __str__(self): def stream(self) -> RunEventListener: return self.workflow_run_event_listener.stream(self.workflow_run_id) - async def result(self): - return await self.workflow_listener.result(self.workflow_run_id) + def result(self): + return self.workflow_listener.result(self.workflow_run_id) From 474b5816ccca469f0cd46dbab73ee74723f781cd Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Thu, 30 May 2024 21:59:51 -0400 Subject: [PATCH 3/3] chore: lint --- hatchet_sdk/context.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/hatchet_sdk/context.py b/hatchet_sdk/context.py index 3f019947..e0fe87df 100644 --- a/hatchet_sdk/context.py +++ b/hatchet_sdk/context.py @@ -64,9 +64,8 @@ async def spawn_workflow( self, workflow_name: str, input: dict = {}, key: str = None ) -> WorkflowRunRef: options = self._prepare_workflow_options(key) - return await self.admin_client.aio.run_workflow( - workflow_name, input, options - ) + return await self.admin_client.aio.run_workflow(workflow_name, input, options) + class Context(BaseContext): spawn_index = -1 @@ -175,9 +174,7 @@ def spawn_workflow(self, workflow_name: str, input: dict = {}, key: str = None): options = self._prepare_workflow_options(key) - return self.admin_client.run_workflow( - workflow_name, input, options - ) + return self.admin_client.run_workflow(workflow_name, input, options) def _log(self, line: str): try: