Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: light refactor and e2e tests #108

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: e2e

on: [pull_request]

jobs:
build:

runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12"]

steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Display Python version
run: python -c "import sys; print(sys.version)"
- name: Install Poetry
uses: snok/install-poetry@v1
with:
version: 1.5.1
virtualenvs-create: true
virtualenvs-in-project: true
- name: Install dependencies
run: poetry install --no-interaction
- name: Set HATCHET_CLIENT_NAMESPACE
run: |
PYTHON_VERSION=$(python -c "import sys; print(f'py{sys.version_info.major}{sys.version_info.minor}')")
SHORT_SHA=$(git rev-parse --short HEAD)
echo "HATCHET_CLIENT_NAMESPACE=${PYTHON_VERSION}-${SHORT_SHA}" >> $GITHUB_ENV
- name: Run pytest
env:
HATCHET_CLIENT_TOKEN: ${{ secrets.HATCHET_CLIENT_TOKEN }}
run: |
echo "Using HATCHET_CLIENT_NAMESPACE: $HATCHET_CLIENT_NAMESPACE"
poetry run pytest
11 changes: 11 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"python.testing.pytestArgs": [
"."
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"cSpell.words": [
"dotenv",
"reqs"
]
}
1 change: 1 addition & 0 deletions examples/_deprecated/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The examples and tests in this directory are deprecated, but we're maintaining them to ensure backwards compatibility.
26 changes: 26 additions & 0 deletions examples/_deprecated/test_event_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import pytest
from dotenv import load_dotenv

from hatchet_sdk import new_client
from hatchet_sdk.hatchet import Hatchet

load_dotenv()


@pytest.mark.asyncio(scope="session")
async def test_direct_client_event():
client = new_client()
e = client.event.push("user:create", {"test": "test"})

assert e.eventId is not None


@pytest.mark.filterwarnings(
"ignore:Direct access to client is deprecated:DeprecationWarning"
)
@pytest.mark.asyncio(scope="session")
async def test_hatchet_client_event():
hatchet = Hatchet()
e = hatchet.client.event.push("user:create", {"test": "test"})

assert e.eventId is not None
18 changes: 12 additions & 6 deletions examples/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@

hatchet = Hatchet(debug=True)

list: WorkflowList = hatchet.client.rest().workflow_list()

for workflow in list.rows:
print(workflow.name)
print(workflow.metadata.id)
print(workflow.metadata.created_at)
print(workflow.metadata.updated_at)
def main():
list: WorkflowList = hatchet.rest.workflow_list()

for workflow in list.rows:
print(workflow.name)
print(workflow.metadata.id)
print(workflow.metadata.created_at)
print(workflow.metadata.updated_at)


if __name__ == "__main__":
main()
14 changes: 14 additions & 0 deletions examples/api/test_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils.hatchet_client import hatchet_client_fixture

hatchet = hatchet_client_fixture()


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_list_workflows(hatchet: Hatchet):
list = hatchet.rest.workflow_list()

assert len(list.rows) != 0
8 changes: 8 additions & 0 deletions examples/async/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from dotenv import load_dotenv

from hatchet_sdk import Hatchet

load_dotenv()

hatchet = Hatchet()
hatchet.event.push("async:create", {"test": "test"})
9 changes: 0 additions & 9 deletions examples/async/event_test.py

This file was deleted.

23 changes: 23 additions & 0 deletions examples/async/test_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker
from tests.utils.hatchet_client import hatchet_client_fixture

hatchet = hatchet_client_fixture()
worker = fixture_bg_worker(["poetry", "run", "async"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
run = hatchet.admin.run_workflow("AsyncWorkflow", {})
result = await run.result()
assert result["step1"]["test"] == "test"


@pytest.mark.asyncio(scope="session")
async def test_run_async(hatchet: Hatchet):
run = await hatchet.admin.aio.run_workflow("AsyncWorkflow", {})
result = await run.result()
assert result["step1"]["test"] == "test"
20 changes: 9 additions & 11 deletions examples/async/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,31 @@
hatchet = Hatchet(debug=True)


@hatchet.workflow(on_events=["user:create"])
@hatchet.workflow(on_events=["async:create"])
class AsyncWorkflow:
def __init__(self):
self.my_value = "test"

@hatchet.step(timeout="2s")
async def step1(self, context: Context):
context.refresh_timeout("5s")

print("started step1")
await asyncio.sleep(3)
print("finished step1")

return {"test": "test"}

@hatchet.step(parents=["step1"], timeout="4s")
async def step2(self, context):
print("started async step2")
await asyncio.sleep(2)
print("finished step2")


async def main():
async def _main():
workflow = AsyncWorkflow()
worker = hatchet.worker("test-worker", max_runs=4)
worker = hatchet.worker("async-worker", max_runs=4)
worker.register_workflow(workflow)
await worker.async_start()


asyncio.run(main())
def main():
asyncio.run(_main())


if __name__ == "__main__":
main()
17 changes: 17 additions & 0 deletions examples/cancellation/test_cancellation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import pytest

from hatchet_sdk import Hatchet
from tests.utils import fixture_bg_worker
from tests.utils.hatchet_client import hatchet_client_fixture

hatchet = hatchet_client_fixture()
worker = fixture_bg_worker(["poetry", "run", "cancellation"])


# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
run = hatchet.admin.run_workflow("CancelWorkflow", {})
result = await run.result()
# TODO is this the expected result for a timed out run...
assert result == {}
9 changes: 3 additions & 6 deletions examples/cancellation/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,20 @@

@hatchet.workflow(on_events=["user:create"])
class CancelWorkflow:
def __init__(self):
self.my_value = "test"

@hatchet.step(timeout="10s", retries=1)
async def step1(self, context: Context):
i = 0
while not context.exit_flag.is_set() and i < 20:
while not context.exit_flag and i < 20:
print(f"Waiting for cancellation {i}")
await asyncio.sleep(1)
i += 1

if context.exit_flag.is_set():
if context.exit_flag:
print("Cancelled")


workflow = CancelWorkflow()
worker = hatchet.worker("test-worker", max_runs=4)
worker = hatchet.worker("cancellation-worker", max_runs=4)
worker.register_workflow(workflow)

worker.start()
62 changes: 62 additions & 0 deletions examples/concurrency-limit-rr/test_concurrency_limit_rr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import asyncio
import time

import pytest

from hatchet_sdk import Hatchet
from hatchet_sdk.workflow_run import WorkflowRunRef
from tests.utils import fixture_bg_worker
from tests.utils.hatchet_client import hatchet_client_fixture

hatchet = hatchet_client_fixture()
worker = fixture_bg_worker(["poetry", "run", "concurrency_limit_rr"])


# requires scope module or higher for shared event loop
@pytest.mark.skip(reason="The timing for this test is not reliable")
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
num_groups = 2
runs: list[WorkflowRunRef] = []

# Start all runs
for i in range(1, num_groups + 1):
run = hatchet.admin.run_workflow("ConcurrencyDemoWorkflowRR", {"group": i})
runs.append(run)
run = hatchet.admin.run_workflow("ConcurrencyDemoWorkflowRR", {"group": i})
runs.append(run)

# Wait for all results
successful_runs = []
cancelled_runs = []

start_time = time.time()

# Process each run individually
for i, run in enumerate(runs, start=1):
try:
result = await run.result()
successful_runs.append((i, result))
except Exception as e:
if "CANCELLED_BY_CONCURRENCY_LIMIT" in str(e):
cancelled_runs.append((i, str(e)))
else:
raise # Re-raise if it's an unexpected error

end_time = time.time()
total_time = end_time - start_time

# Check that we have the correct number of successful and cancelled runs
assert (
len(successful_runs) == 4
), f"Expected 4 successful runs, got {len(successful_runs)}"
assert (
len(cancelled_runs) == 0
), f"Expected 0 cancelled run, got {len(cancelled_runs)}"

# Check that the total time is close to 2 seconds
assert (
3.8 <= total_time <= 5
), f"Expected runtime to be about 4 seconds, but it took {total_time:.2f} seconds"

print(f"Total execution time: {total_time:.2f} seconds")
6 changes: 2 additions & 4 deletions examples/concurrency-limit-rr/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@ class ConcurrencyDemoWorkflowRR:
)
def concurrency(self, context: Context) -> str:
input = context.workflow_input()

print(input)

return input.get("group")
return f'group-{input["group"]}'

@hatchet.step()
def step1(self, context):
print("starting step1")
time.sleep(0.2)
time.sleep(2)
print("finished step1")
pass

Expand Down
47 changes: 47 additions & 0 deletions examples/concurrency-limit/test_concurrency_limit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import asyncio

import pytest

from hatchet_sdk import Hatchet
from hatchet_sdk.workflow_run import WorkflowRunRef
from tests.utils import fixture_bg_worker
from tests.utils.hatchet_client import hatchet_client_fixture

hatchet = hatchet_client_fixture()
worker = fixture_bg_worker(["poetry", "run", "concurrency_limit"])


# requires scope module or higher for shared event loop
@pytest.mark.skip(reason="The timing for this test is not reliable")
@pytest.mark.asyncio(scope="session")
async def test_run(hatchet: Hatchet):
num_runs = 6
runs: list[WorkflowRunRef] = []

# Start all runs
for i in range(1, num_runs + 1):
run = hatchet.admin.run_workflow("ConcurrencyDemoWorkflow", {"run": i})
runs.append(run)

# Wait for all results
successful_runs = []
cancelled_runs = []

# Process each run individually
for i, run in enumerate(runs, start=1):
try:
result = await run.result()
successful_runs.append((i, result))
except Exception as e:
if "CANCELLED_BY_CONCURRENCY_LIMIT" in str(e):
cancelled_runs.append((i, str(e)))
else:
raise # Re-raise if it's an unexpected error

# Check that we have the correct number of successful and cancelled runs
assert (
len(successful_runs) == 5
), f"Expected 5 successful runs, got {len(successful_runs)}"
assert (
len(cancelled_runs) == 1
), f"Expected 1 cancelled run, got {len(cancelled_runs)}"
Loading
Loading