Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.

Commit 0446b8b

Browse files
authored
Fix: Pause Work Assignment on Exit (#332)
* feat: pause assignment on listener shutdown * fix: lint * chore: ver
1 parent f96e571 commit 0446b8b

File tree

3 files changed

+28
-7
lines changed

3 files changed

+28
-7
lines changed

hatchet_sdk/clients/rest_client.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from hatchet_sdk.clients.rest.api.event_api import EventApi
1010
from hatchet_sdk.clients.rest.api.log_api import LogApi
1111
from hatchet_sdk.clients.rest.api.step_run_api import StepRunApi
12+
from hatchet_sdk.clients.rest.api.worker_api import WorkerApi
1213
from hatchet_sdk.clients.rest.api.workflow_api import WorkflowApi
1314
from hatchet_sdk.clients.rest.api.workflow_run_api import WorkflowRunApi
1415
from hatchet_sdk.clients.rest.api.workflow_runs_api import WorkflowRunsApi
@@ -83,6 +84,7 @@ def __init__(self, host: str, api_key: str, tenant_id: str):
8384
self._step_run_api = None
8485
self._event_api = None
8586
self._log_api = None
87+
self._worker_api: WorkerApi | None = None
8688

8789
@property
8890
def api_client(self):
@@ -102,6 +104,13 @@ def workflow_run_api(self):
102104
self._workflow_run_api = WorkflowRunApi(self.api_client)
103105
return self._workflow_run_api
104106

107+
@property
108+
def worker_api(self):
109+
if self._worker_api is None:
110+
self._worker_api = WorkerApi(self.api_client)
111+
112+
return self._worker_api
113+
105114
@property
106115
def step_run_api(self):
107116
if self._step_run_api is None:

hatchet_sdk/worker/action_listener_process.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@
88

99
import grpc
1010

11+
from hatchet_sdk.client import Client, new_client_raw
1112
from hatchet_sdk.clients.dispatcher.action_listener import Action
1213
from hatchet_sdk.clients.dispatcher.dispatcher import (
1314
ActionListener,
1415
GetActionListenerRequest,
1516
new_dispatcher,
1617
)
18+
from hatchet_sdk.clients.rest.models.update_worker_request import UpdateWorkerRequest
1719
from hatchet_sdk.contracts.dispatcher_pb2 import (
1820
GROUP_KEY_EVENT_TYPE_STARTED,
1921
STEP_EVENT_TYPE_STARTED,
@@ -41,10 +43,6 @@ class ActionEvent:
4143
)
4244

4345

44-
def noop_handler():
45-
pass
46-
47-
4846
@dataclass
4947
class WorkerActionListenerProcess:
5048
name: str
@@ -70,9 +68,15 @@ def __post_init__(self):
7068
if self.debug:
7169
logger.setLevel(logging.DEBUG)
7270

71+
self.client = new_client_raw(self.config, self.debug)
72+
7373
loop = asyncio.get_event_loop()
74-
loop.add_signal_handler(signal.SIGINT, noop_handler)
75-
loop.add_signal_handler(signal.SIGTERM, noop_handler)
74+
loop.add_signal_handler(
75+
signal.SIGINT, lambda: asyncio.create_task(self.pause_task_assignment())
76+
)
77+
loop.add_signal_handler(
78+
signal.SIGTERM, lambda: asyncio.create_task(self.pause_task_assignment())
79+
)
7680
loop.add_signal_handler(
7781
signal.SIGQUIT, lambda: asyncio.create_task(self.exit_gracefully())
7882
)
@@ -249,7 +253,15 @@ async def cleanup(self):
249253

250254
self.event_queue.put(STOP_LOOP)
251255

256+
async def pause_task_assignment(self) -> None:
257+
await self.client.rest.aio.worker_api.worker_update(
258+
worker=self.listener.worker_id,
259+
update_worker_request=UpdateWorkerRequest(isPaused=True),
260+
)
261+
252262
async def exit_gracefully(self, skip_unregister=False):
263+
await self.pause_task_assignment()
264+
253265
if self.killing:
254266
return
255267

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "hatchet-sdk"
3-
version = "0.47.0"
3+
version = "0.47.1"
44
description = ""
55
authors = ["Alexander Belanger <[email protected]>"]
66
readme = "README.md"

0 commit comments

Comments
 (0)