Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.

Commit f96e571

Browse files
authored
0.47.0 (#324)
* Fix: More OTel Cleanup (#319) * feat: factor our helpers * chore: bump version * fix: replace none-default * fix: auto-create parent * feat: docs * fix: doc format * fix: example * feat: use global tracer if none provided * fix: docs * Fix: Bump min `celpy` version (#320) * fix: bump min celpy version * chore: ver * Feat: Sync spawn methods + fixing `sync_result` (#321) * feat: add sync spawn method * feat: example * fix: sync workflow run + sync_result * fix: examples * fix: lint * feat: bulk spawn * chore: ver * fix: test * fix: lint * Feat: Killing sync threads (#323) * feat: thread killing * fix: rm events * fix: lint * fix: cruft * fix: async sleep * feat: flag for enabling force killing threads
1 parent 1a7164d commit f96e571

File tree

14 files changed

+326
-88
lines changed

14 files changed

+326
-88
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import pytest
2+
3+
from hatchet_sdk import Hatchet, Worker
4+
5+
6+
@pytest.mark.parametrize("worker", ["fanout_sync"], indirect=True)
7+
def test_run(hatchet: Hatchet, worker: Worker) -> None:
8+
run = hatchet.admin.run_workflow("SyncFanoutParent", {"n": 2})
9+
result = run.sync_result()
10+
assert len(result["spawn"]["results"]) == 2

examples/fanout_sync/trigger.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import asyncio
2+
3+
from dotenv import load_dotenv
4+
5+
from hatchet_sdk import new_client
6+
7+
8+
async def main() -> None:
9+
load_dotenv()
10+
hatchet = new_client()
11+
12+
hatchet.admin.run_workflow(
13+
"SyncFanoutParent",
14+
{"test": "test"},
15+
options={"additional_metadata": {"hello": "moon"}},
16+
)
17+
18+
19+
if __name__ == "__main__":
20+
asyncio.run(main())

examples/fanout_sync/worker.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
from typing import Any
2+
3+
from dotenv import load_dotenv
4+
5+
from hatchet_sdk import Context, Hatchet
6+
from hatchet_sdk.workflow_run import WorkflowRunRef
7+
8+
load_dotenv()
9+
10+
hatchet = Hatchet(debug=True)
11+
12+
13+
@hatchet.workflow(on_events=["parent:create"])
14+
class SyncFanoutParent:
15+
@hatchet.step(timeout="5m")
16+
def spawn(self, context: Context) -> dict[str, Any]:
17+
print("spawning child")
18+
19+
n = context.workflow_input().get("n", 5)
20+
21+
runs = context.spawn_workflows(
22+
[
23+
{
24+
"workflow_name": "SyncFanoutChild",
25+
"input": {"a": str(i)},
26+
"key": f"child{i}",
27+
"options": {"additional_metadata": {"hello": "earth"}},
28+
}
29+
for i in range(n)
30+
]
31+
)
32+
33+
results = [r.sync_result() for r in runs]
34+
35+
print(f"results {results}")
36+
37+
return {"results": results}
38+
39+
40+
@hatchet.workflow(on_events=["child:create"])
41+
class SyncFanoutChild:
42+
@hatchet.step()
43+
def process(self, context: Context) -> dict[str, str]:
44+
return {"status": "success " + context.workflow_input()["a"]}
45+
46+
47+
def main() -> None:
48+
worker = hatchet.worker("sync-fanout-worker", max_runs=40)
49+
worker.register_workflow(SyncFanoutParent())
50+
worker.register_workflow(SyncFanoutChild())
51+
worker.start()
52+
53+
54+
if __name__ == "__main__":
55+
main()

examples/opentelemetry_instrumentation/test_otel_instrumentation.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@
66
from hatchet_sdk import Hatchet, Worker
77
from hatchet_sdk.clients.admin import TriggerWorkflowOptions
88
from hatchet_sdk.clients.events import PushEventOptions
9-
from hatchet_sdk.opentelemetry.instrumentor import HatchetInstrumentor
9+
from hatchet_sdk.opentelemetry.instrumentor import (
10+
HatchetInstrumentor,
11+
create_traceparent,
12+
inject_traceparent_into_metadata,
13+
)
1014

1115
trace_provider = NoOpTracerProvider()
1216

@@ -17,9 +21,7 @@
1721

1822

1923
def create_additional_metadata() -> dict[str, str]:
20-
return instrumentor.inject_traceparent_into_metadata(
21-
{"hello": "world"}, instrumentor.create_traceparent()
22-
)
24+
return inject_traceparent_into_metadata({"hello": "world"})
2325

2426

2527
def create_push_options() -> PushEventOptions:

examples/opentelemetry_instrumentation/triggers.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,18 @@
44
from examples.opentelemetry_instrumentation.tracer import trace_provider
55
from hatchet_sdk.clients.admin import TriggerWorkflowOptions
66
from hatchet_sdk.clients.events import PushEventOptions
7-
from hatchet_sdk.opentelemetry.instrumentor import HatchetInstrumentor
7+
from hatchet_sdk.opentelemetry.instrumentor import (
8+
HatchetInstrumentor,
9+
create_traceparent,
10+
inject_traceparent_into_metadata,
11+
)
812

913
instrumentor = HatchetInstrumentor(tracer_provider=trace_provider)
1014
tracer = trace_provider.get_tracer(__name__)
1115

1216

1317
def create_additional_metadata() -> dict[str, str]:
14-
return instrumentor.inject_traceparent_into_metadata(
15-
{"hello": "world"}, instrumentor.create_traceparent()
16-
)
18+
return inject_traceparent_into_metadata({"hello": "world"})
1719

1820

1921
def create_push_options() -> PushEventOptions:

hatchet_sdk/clients/workflow_listener.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ class PooledWorkflowRunListener:
7575
interrupter: asyncio.Task = None
7676

7777
def __init__(self, config: ClientConfig):
78+
try:
79+
asyncio.get_running_loop()
80+
except RuntimeError:
81+
loop = asyncio.new_event_loop()
82+
asyncio.set_event_loop(loop)
83+
7884
conn = new_conn(config, True)
7985
self.client = DispatcherStub(conn)
8086
self.token = config.token
@@ -260,12 +266,10 @@ async def _retry_subscribe(self):
260266
if self.curr_requester != 0:
261267
self.requests.put_nowait(self.curr_requester)
262268

263-
listener = self.client.SubscribeToWorkflowRuns(
269+
return self.client.SubscribeToWorkflowRuns(
264270
self._request(),
265271
metadata=get_metadata(self.token),
266272
)
267-
268-
return listener
269273
except grpc.RpcError as e:
270274
if e.code() == grpc.StatusCode.UNAVAILABLE:
271275
retries = retries + 1

hatchet_sdk/context/context.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,3 +403,44 @@ def fetch_run_failures(self) -> list[dict[str, StrictStr]]:
403403
for step_run in job_run.step_runs
404404
if step_run.error and step_run.step
405405
]
406+
407+
@tenacity_retry
408+
def spawn_workflow(
409+
self,
410+
workflow_name: str,
411+
input: dict[str, Any] = {},
412+
key: str | None = None,
413+
options: ChildTriggerWorkflowOptions | None = None,
414+
) -> WorkflowRunRef:
415+
worker_id = self.worker.id()
416+
trigger_options = self._prepare_workflow_options(key, options, worker_id)
417+
418+
return self.admin_client.run_workflow(workflow_name, input, trigger_options)
419+
420+
@tenacity_retry
421+
def spawn_workflows(
422+
self, child_workflow_runs: list[ChildWorkflowRunDict]
423+
) -> list[WorkflowRunRef]:
424+
425+
if len(child_workflow_runs) == 0:
426+
raise Exception("no child workflows to spawn")
427+
428+
worker_id = self.worker.id()
429+
430+
bulk_trigger_workflow_runs: list[WorkflowRunDict] = []
431+
for child_workflow_run in child_workflow_runs:
432+
workflow_name = child_workflow_run["workflow_name"]
433+
input = child_workflow_run["input"]
434+
435+
key = child_workflow_run.get("key")
436+
options = child_workflow_run.get("options", {})
437+
438+
trigger_options = self._prepare_workflow_options(key, options, worker_id)
439+
440+
bulk_trigger_workflow_runs.append(
441+
WorkflowRunDict(
442+
workflow_name=workflow_name, input=input, options=trigger_options
443+
)
444+
)
445+
446+
return self.admin_client.run_workflows(bulk_trigger_workflow_runs)

hatchet_sdk/loader.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def __init__(
4242
worker_healthcheck_port: int | None = None,
4343
worker_healthcheck_enabled: bool | None = None,
4444
worker_preset_labels: dict[str, str] = {},
45+
enable_force_kill_sync_threads: bool = False,
4546
):
4647
self.tenant_id = tenant_id
4748
self.tls_config = tls_config
@@ -55,6 +56,7 @@ def __init__(
5556
self.worker_healthcheck_port = worker_healthcheck_port
5657
self.worker_healthcheck_enabled = worker_healthcheck_enabled
5758
self.worker_preset_labels = worker_preset_labels
59+
self.enable_force_kill_sync_threads = enable_force_kill_sync_threads
5860

5961
if not self.logInterceptor:
6062
self.logInterceptor = getLogger()
@@ -174,6 +176,14 @@ def get_config_value(key, env_var):
174176
"The `otel_exporter_otlp_*` fields are no longer supported as of SDK version `0.46.0`. Please see the documentation on OpenTelemetry at https://docs.hatchet.run/home/features/opentelemetry for more information on how to migrate to the new `HatchetInstrumentor`."
175177
)
176178

179+
enable_force_kill_sync_threads = bool(
180+
get_config_value(
181+
"enable_force_kill_sync_threads",
182+
"HATCHET_CLIENT_ENABLE_FORCE_KILL_SYNC_THREADS",
183+
)
184+
== "True"
185+
or False
186+
)
177187
return ClientConfig(
178188
tenant_id=tenant_id,
179189
tls_config=tls_config,
@@ -188,6 +198,7 @@ def get_config_value(key, env_var):
188198
worker_healthcheck_port=worker_healthcheck_port,
189199
worker_healthcheck_enabled=worker_healthcheck_enabled,
190200
worker_preset_labels=worker_preset_labels,
201+
enable_force_kill_sync_threads=enable_force_kill_sync_threads,
191202
)
192203

193204
def _load_tls_config(self, tls_data: Dict, host_port) -> ClientTLSConfig:

0 commit comments

Comments
 (0)