From 7f819219342a1d8408a75cbfc3c45d89721c3e7f Mon Sep 17 00:00:00 2001 From: srhinos <6531393+srhinos@users.noreply.github.com> Date: Tue, 20 Aug 2024 11:39:10 -0400 Subject: [PATCH] Add Ability to Fetch Workflow Run Step Errors in Step Context --- hatchet_sdk/context/context.py | 26 ++++++++++++++++++++++++++ hatchet_sdk/worker/runner/runner.py | 7 +++++-- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/hatchet_sdk/context/context.py b/hatchet_sdk/context/context.py index 986f03b2..ccfced90 100644 --- a/hatchet_sdk/context/context.py +++ b/hatchet_sdk/context/context.py @@ -4,6 +4,7 @@ from concurrent.futures import Future, ThreadPoolExecutor from hatchet_sdk.clients.events import EventClient +from hatchet_sdk.clients.rest_client import RestApi from hatchet_sdk.clients.run_event_listener import RunEventListenerClient from hatchet_sdk.clients.workflow_listener import PooledWorkflowRunListener from hatchet_sdk.context.worker_context import WorkerContext @@ -65,6 +66,7 @@ def __init__( dispatcher_client: DispatcherClient, admin_client: AdminClient, event_client: EventClient, + rest_client: RestApi, workflow_listener: PooledWorkflowRunListener, workflow_run_event_listener: RunEventListenerClient, worker: WorkerContext, @@ -74,6 +76,7 @@ def __init__( self.dispatcher_client = dispatcher_client self.admin_client = admin_client self.event_client = event_client + self.rest_client = rest_client self.workflow_listener = workflow_listener self.workflow_run_event_listener = workflow_run_event_listener self.namespace = namespace @@ -116,6 +119,7 @@ def __init__( dispatcher_client: DispatcherClient, admin_client: AdminClient, event_client: EventClient, + rest_client: RestApi, workflow_listener: PooledWorkflowRunListener, workflow_run_event_listener: RunEventListenerClient, worker: WorkerContext, @@ -128,6 +132,7 @@ def __init__( dispatcher_client, admin_client, event_client, + rest_client, workflow_listener, workflow_run_event_listener, worker, @@ -157,6 +162,7 @@ def __init__( self.dispatcher_client = dispatcher_client self.admin_client = admin_client self.event_client = event_client + self.rest_client = rest_client self.workflow_listener = workflow_listener self.workflow_run_event_listener = workflow_run_event_listener self.namespace = namespace @@ -291,3 +297,23 @@ def child_key(self): def parent_workflow_run_id(self): return self.action.parent_workflow_run_id + + def fetch_run_failures(self): + data = self.rest_client.workflow_run_get(self.action.workflow_run_id) + other_job_runs = [ + run for run in data.job_runs if run.job_id != self.action.job_id + ] + # TODO: Parse Step Runs using a Pydantic Model rather than a hand crafted dictionary + failed_step_runs = [ + { + "step_id": step_run.step_id, + "step_run_action_name": step_run.step.action, + "error": step_run.error, + } + for job_run in other_job_runs + if job_run.step_runs + for step_run in job_run.step_runs + if step_run.error + ] + + return failed_step_runs diff --git a/hatchet_sdk/worker/runner/runner.py b/hatchet_sdk/worker/runner/runner.py index af2f896a..c9f005a6 100644 --- a/hatchet_sdk/worker/runner/runner.py +++ b/hatchet_sdk/worker/runner/runner.py @@ -334,6 +334,7 @@ async def handle_start_step_run(self, action: Action): self.dispatcher_client, self.admin_client, self.client.event, + self.client.rest, self.client.workflow_listener, self.workflow_run_event_listener, self.worker_context, @@ -345,6 +346,7 @@ async def handle_start_step_run(self, action: Action): self.dispatcher_client, self.admin_client, self.client.event, + self.client.rest, self.client.workflow_listener, self.workflow_run_event_listener, self.worker_context, @@ -373,7 +375,7 @@ async def handle_start_step_run(self, action: Action): try: await task - except Exception as e: + except Exception: # do nothing, this should be caught in the callback pass @@ -384,6 +386,7 @@ async def handle_start_group_key_run(self, action: Action): self.dispatcher_client, self.admin_client, self.client.event, + self.client.rest, self.client.workflow_listener, self.workflow_run_event_listener, self.worker_context, @@ -415,7 +418,7 @@ async def handle_start_group_key_run(self, action: Action): try: await task - except Exception as e: + except Exception: # do nothing, this should be caught in the callback pass