Skip to content

Commit

Permalink
Add Ability to Fetch Workflow Run Step Errors in Step Context
Browse files Browse the repository at this point in the history
  • Loading branch information
srhinos committed Aug 20, 2024
1 parent af89ee9 commit 7f81921
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
26 changes: 26 additions & 0 deletions hatchet_sdk/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from concurrent.futures import Future, ThreadPoolExecutor

from hatchet_sdk.clients.events import EventClient
from hatchet_sdk.clients.rest_client import RestApi
from hatchet_sdk.clients.run_event_listener import RunEventListenerClient
from hatchet_sdk.clients.workflow_listener import PooledWorkflowRunListener
from hatchet_sdk.context.worker_context import WorkerContext
Expand Down Expand Up @@ -65,6 +66,7 @@ def __init__(
dispatcher_client: DispatcherClient,
admin_client: AdminClient,
event_client: EventClient,
rest_client: RestApi,
workflow_listener: PooledWorkflowRunListener,
workflow_run_event_listener: RunEventListenerClient,
worker: WorkerContext,
Expand All @@ -74,6 +76,7 @@ def __init__(
self.dispatcher_client = dispatcher_client
self.admin_client = admin_client
self.event_client = event_client
self.rest_client = rest_client
self.workflow_listener = workflow_listener
self.workflow_run_event_listener = workflow_run_event_listener
self.namespace = namespace
Expand Down Expand Up @@ -116,6 +119,7 @@ def __init__(
dispatcher_client: DispatcherClient,
admin_client: AdminClient,
event_client: EventClient,
rest_client: RestApi,
workflow_listener: PooledWorkflowRunListener,
workflow_run_event_listener: RunEventListenerClient,
worker: WorkerContext,
Expand All @@ -128,6 +132,7 @@ def __init__(
dispatcher_client,
admin_client,
event_client,
rest_client,
workflow_listener,
workflow_run_event_listener,
worker,
Expand Down Expand Up @@ -157,6 +162,7 @@ def __init__(
self.dispatcher_client = dispatcher_client
self.admin_client = admin_client
self.event_client = event_client
self.rest_client = rest_client
self.workflow_listener = workflow_listener
self.workflow_run_event_listener = workflow_run_event_listener
self.namespace = namespace
Expand Down Expand Up @@ -291,3 +297,23 @@ def child_key(self):

def parent_workflow_run_id(self):
return self.action.parent_workflow_run_id

def fetch_run_failures(self):
data = self.rest_client.workflow_run_get(self.action.workflow_run_id)
other_job_runs = [
run for run in data.job_runs if run.job_id != self.action.job_id
]
# TODO: Parse Step Runs using a Pydantic Model rather than a hand crafted dictionary
failed_step_runs = [
{
"step_id": step_run.step_id,
"step_run_action_name": step_run.step.action,
"error": step_run.error,
}
for job_run in other_job_runs
if job_run.step_runs
for step_run in job_run.step_runs
if step_run.error
]

return failed_step_runs
7 changes: 5 additions & 2 deletions hatchet_sdk/worker/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ async def handle_start_step_run(self, action: Action):
self.dispatcher_client,
self.admin_client,
self.client.event,
self.client.rest,
self.client.workflow_listener,
self.workflow_run_event_listener,
self.worker_context,
Expand All @@ -345,6 +346,7 @@ async def handle_start_step_run(self, action: Action):
self.dispatcher_client,
self.admin_client,
self.client.event,
self.client.rest,
self.client.workflow_listener,
self.workflow_run_event_listener,
self.worker_context,
Expand Down Expand Up @@ -373,7 +375,7 @@ async def handle_start_step_run(self, action: Action):

try:
await task
except Exception as e:
except Exception:
# do nothing, this should be caught in the callback
pass

Expand All @@ -384,6 +386,7 @@ async def handle_start_group_key_run(self, action: Action):
self.dispatcher_client,
self.admin_client,
self.client.event,
self.client.rest,
self.client.workflow_listener,
self.workflow_run_event_listener,
self.worker_context,
Expand Down Expand Up @@ -415,7 +418,7 @@ async def handle_start_group_key_run(self, action: Action):

try:
await task
except Exception as e:
except Exception:
# do nothing, this should be caught in the callback
pass

Expand Down

0 comments on commit 7f81921

Please sign in to comment.