Skip to content

Commit

Permalink
Update Example / Add Test
Browse files Browse the repository at this point in the history
  • Loading branch information
srhinos committed Aug 20, 2024
1 parent 7f81921 commit 950f9f7
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 13 deletions.
38 changes: 28 additions & 10 deletions examples/on_failure/test_on_failure.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,32 @@
# from hatchet_sdk import Hatchet
# import pytest
import asyncio

# from tests.utils import fixture_bg_worker
# from tests.utils.hatchet_client import hatchet_client_fixture
import pytest

from hatchet_sdk import Hatchet
from hatchet_sdk.clients.rest.models.job_run_status import JobRunStatus
from tests.utils import fixture_bg_worker
from tests.utils.hatchet_client import hatchet_client_fixture

# hatchet = hatchet_client_fixture()
# worker = fixture_bg_worker(["poetry", "run", "manual_trigger"])
hatchet = hatchet_client_fixture()
worker = fixture_bg_worker(["poetry", "run", "on_failure"])

# # requires scope module or higher for shared event loop
# @pytest.mark.asyncio(scope="session")
# async def test_run(hatchet: Hatchet):
# # TODO

# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run_timeout(hatchet: Hatchet):
run = hatchet.admin.run_workflow("OnFailureWorkflow", {})
try:
await run.result()
assert False, "Expected workflow to timeout"
except Exception as e:
assert "step1 failed" in str(e)

await asyncio.sleep(2) # Wait for the on_failure job to finish

job_runs = hatchet.rest.workflow_run_get(run.workflow_run_id).job_runs
assert len(job_runs) == 2

successful_job_runs = [jr for jr in job_runs if jr.status == JobRunStatus.SUCCEEDED]
failed_job_runs = [jr for jr in job_runs if jr.status == JobRunStatus.FAILED]
assert len(successful_job_runs) == 1
assert len(failed_job_runs) == 1
10 changes: 7 additions & 3 deletions examples/on_failure/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@

@hatchet.workflow(on_events=["user:create"])
class OnFailureWorkflow:
@hatchet.step()
@hatchet.step(timeout="1s")
def step1(self, context: Context):
raise Exception("step1 failed")

@hatchet.on_failure_step()
def on_failure(self, context):
def on_failure(self, context: Context):
failures = context.fetch_run_failures()
print("executed on_failure")
print(context)
print(json.dumps(failures, indent=2))
if len(failures) == 1 and "step1 failed" in failures[0]["error"]:
return {"status": "success"}
raise Exception("unexpected failure")


def main():
Expand Down

0 comments on commit 950f9f7

Please sign in to comment.