Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch GRPC BlockingIO Error #177

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions examples/cancellation/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@ async def step1(self, context: Context):
print("Cancelled")


workflow = CancelWorkflow()
worker = hatchet.worker("cancellation-worker", max_runs=4)
worker.register_workflow(workflow)
def main():
workflow = CancelWorkflow()
worker = hatchet.worker("cancellation-worker", max_runs=4)
worker.register_workflow(workflow)

worker.start()
worker.start()


if __name__ == "__main__":
main()
13 changes: 9 additions & 4 deletions examples/concurrency_limit_rr/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@ def step1(self, context):
pass


workflow = ConcurrencyDemoWorkflowRR()
worker = hatchet.worker("concurrency-demo-worker-rr", max_runs=10)
worker.register_workflow(workflow)
def main():
workflow = ConcurrencyDemoWorkflowRR()
worker = hatchet.worker("concurrency-demo-worker-rr", max_runs=10)
worker.register_workflow(workflow)

worker.start()
worker.start()


if __name__ == "__main__":
main()
13 changes: 9 additions & 4 deletions examples/delayed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ def step1(self, context: Context):
print(f"message \t {context.workflow_input()['message']}")


worker = hatchet.worker("delayed-worker", max_runs=4)
worker.register_workflow(PrintSchedule())
worker.register_workflow(PrintPrinter())
def main():
worker = hatchet.worker("delayed-worker", max_runs=4)
worker.register_workflow(PrintSchedule())
worker.register_workflow(PrintPrinter())

worker.start()
worker.start()


if __name__ == "__main__":
main()
13 changes: 9 additions & 4 deletions examples/manual_trigger/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,13 @@ def step2(self, context):
return {"step2": "data2"}


workflow = ManualTriggerWorkflow()
worker = hatchet.worker("manual-worker", max_runs=4)
worker.register_workflow(workflow)
def main():
workflow = ManualTriggerWorkflow()
worker = hatchet.worker("manual-worker", max_runs=4)
worker.register_workflow(workflow)

worker.start()
worker.start()


if __name__ == "__main__":
main()
13 changes: 9 additions & 4 deletions examples/overrides/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,13 @@ def step4(self, context: Context):
}


workflow = OverridesWorkflow()
worker = hatchet.worker("overrides-worker")
worker.register_workflow(workflow)
def main():
workflow = OverridesWorkflow()
worker = hatchet.worker("overrides-worker")
worker.register_workflow(workflow)

worker.start()
worker.start()


if __name__ == "__main__":
main()
4 changes: 4 additions & 0 deletions examples/rate_limit/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ def main():
worker.register_workflow(RateLimitWorkflow())

worker.start()


if __name__ == "__main__":
main()
13 changes: 9 additions & 4 deletions examples/sticky-workers/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ def child(self, context: Context):
return {"worker": context.worker.id()}


worker = hatchet.worker("sticky-worker", max_runs=10)
worker.register_workflow(StickyWorkflow())
worker.register_workflow(StickyChildWorkflow())
worker.start()
def main():
worker = hatchet.worker("sticky-worker", max_runs=10)
worker.register_workflow(StickyWorkflow())
worker.register_workflow(StickyChildWorkflow())
worker.start()


if __name__ == "__main__":
main()
14 changes: 5 additions & 9 deletions hatchet_sdk/clients/dispatcher/action_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
)
from hatchet_sdk.contracts.dispatcher_pb2_grpc import DispatcherStub
from hatchet_sdk.logger import logger
from hatchet_sdk.utils.aio_utils import create_new_event_loop, get_active_event_loop
from hatchet_sdk.utils.backoff import exp_backoff_sleep

from ...loader import ClientConfig
Expand Down Expand Up @@ -184,15 +185,10 @@ async def heartbeat(self):
async def start_heartbeater(self):
if self.heartbeat_task is not None:
return

try:
loop = asyncio.get_event_loop()
except RuntimeError as e:
if str(e).startswith("There is no current event loop in thread"):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
else:
raise e
loop = get_active_event_loop(should_raise=False)
if not loop:
loop = create_new_event_loop()
asyncio.set_event_loop(loop)
self.heartbeat_task = loop.create_task(self.heartbeat())

def __aiter__(self):
Expand Down
4 changes: 3 additions & 1 deletion hatchet_sdk/clients/event_ts.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
from typing import Any

from hatchet_sdk.utils.aio_utils import get_active_event_loop


class Event_ts(asyncio.Event):
"""
Expand All @@ -10,7 +12,7 @@ class Event_ts(asyncio.Event):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self._loop is None:
self._loop = asyncio.get_event_loop()
self._loop = get_active_event_loop()

def set(self):
if not self._loop.is_closed():
Expand Down
3 changes: 2 additions & 1 deletion hatchet_sdk/clients/rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
WorkflowRunsCancelRequest,
)
from hatchet_sdk.clients.rest.models.workflow_version import WorkflowVersion
from hatchet_sdk.utils.aio_utils import create_new_event_loop


class AsyncRestApi:
Expand Down Expand Up @@ -246,7 +247,7 @@ async def events_replay(self, event_ids: list[str] | EventList) -> EventList:

class RestApi:
def __init__(self, host: str, api_key: str, tenant_id: str):
self._loop = asyncio.new_event_loop()
self._loop = create_new_event_loop()
self._thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._thread.start()

Expand Down
57 changes: 52 additions & 5 deletions hatchet_sdk/utils/aio_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async def run(*args, loop=None, executor=None, **kwargs):
The result of the function call.
"""
if loop is None:
loop = asyncio.get_running_loop()
loop = get_active_event_loop()

if inspect.iscoroutinefunction(func):
# Wrap the coroutine to run it in an executor
Expand All @@ -80,7 +80,7 @@ def __init__(self):
Initializes the EventLoopThread by creating an event loop
and setting up a thread to run the loop.
"""
self.loop = asyncio.new_event_loop()
self.loop = create_new_event_loop()
self.thread = Thread(target=self.run_loop_in_thread, args=(self.loop,))

def __enter__(self) -> asyncio.AbstractEventLoop:
Expand Down Expand Up @@ -111,7 +111,7 @@ def run_loop_in_thread(self, loop: asyncio.AbstractEventLoop) -> None:
loop.run_forever()


def get_active_event_loop() -> asyncio.AbstractEventLoop | None:
def get_active_event_loop(should_raise=True) -> asyncio.AbstractEventLoop | None:
"""
Get the active event loop.

Expand All @@ -120,9 +120,56 @@ def get_active_event_loop() -> asyncio.AbstractEventLoop | None:
event loop in the current thread.
"""
try:
return asyncio.get_event_loop()
loop = asyncio.get_running_loop()
patch_exception_handler(loop)
return loop
except RuntimeError as e:
if str(e).startswith("There is no current event loop in thread"):
if (
any(
substring in str(e)
for substring in [
"There is no current event loop in thread",
"no running event loop",
]
)
and not should_raise
):
return None
else:
raise e


def create_new_event_loop() -> asyncio.AbstractEventLoop | None:
"""
Create a new event loop.

Returns:
asyncio.AbstractEventLoop: The new event loop.
"""
loop = asyncio.new_event_loop()
patch_exception_handler(loop)
return loop


def patch_exception_handler(loop: asyncio.AbstractEventLoop) -> None:
"""
Patch the asyncio exception handler to ignore `BlockingIOError: [Errno 35] Resource temporarily unavailable`
errors caused by `aio.grpc` when using multiple event loops in separate threads.

This error arises from a Cython implementation detail in `aio.Channel.__init__`, where a `socket.recv(1)` call
succeeds only on the first invocation. Subsequent calls result in the mentioned error, but this does not
impact the functionality of the library and can be safely ignored.

References:
- https://github.com/grpc/grpc/issues/25364
- https://github.com/grpc/grpc/pull/36096
"""

def exception_handler(loop: asyncio.AbstractEventLoop, context: dict) -> None:
if "exception" in context:
err = f"{type(context['exception']).__name__}: {context['exception']}"
if err == "BlockingIOError: [Errno 35] Resource temporarily unavailable":
return
loop.default_exception_handler(context)

loop.set_exception_handler(exception_handler)
19 changes: 11 additions & 8 deletions hatchet_sdk/worker/action_listener_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
from dataclasses import dataclass, field
from multiprocessing import Queue
from multiprocessing.synchronize import Event as EventClass
from typing import Any, List, Mapping, Optional

import grpc
Expand All @@ -21,6 +22,7 @@
)
from hatchet_sdk.loader import ClientConfig
from hatchet_sdk.logger import logger
from hatchet_sdk.utils.aio_utils import get_active_event_loop, patch_exception_handler
from hatchet_sdk.utils.backoff import exp_backoff_sleep

ACTION_EVENT_RETRY_COUNT = 5
Expand Down Expand Up @@ -70,12 +72,11 @@ def __post_init__(self):
if self.debug:
logger.setLevel(logging.DEBUG)

loop = asyncio.get_event_loop()
loop = get_active_event_loop()
patch_exception_handler(loop)
loop.add_signal_handler(signal.SIGINT, noop_handler)
loop.add_signal_handler(signal.SIGTERM, noop_handler)
loop.add_signal_handler(
signal.SIGQUIT, lambda: asyncio.create_task(self.exit_gracefully())
)
loop.add_signal_handler(signal.SIGQUIT, noop_handler)

async def start(self, retry_attempt=0):
if retry_attempt > 5:
Expand Down Expand Up @@ -111,15 +112,15 @@ async def start(self, retry_attempt=0):

# TODO move event methods to separate class
async def _get_event(self):
loop = asyncio.get_running_loop()
loop = get_active_event_loop()
return await loop.run_in_executor(None, self.event_queue.get)

async def start_event_send_loop(self):
while True:
event: ActionEvent = await self._get_event()
if event == STOP_LOOP:
logger.debug("stopping event send loop...")
break
return

logger.debug(f"tx: event: {event.action.action_id}/{event.type}")
asyncio.create_task(self.send_event(event))
Expand Down Expand Up @@ -248,6 +249,7 @@ async def cleanup(self):
self.listener.cleanup()

self.event_queue.put(STOP_LOOP)
await asyncio.sleep(1)

async def exit_gracefully(self, skip_unregister=False):
if self.killing:
Expand All @@ -267,12 +269,13 @@ def exit_forcefully(self):
logger.debug("forcefully closing listener...")


def worker_action_listener_process(*args, **kwargs):
def worker_action_listener_process(stop_event: EventClass, *args, **kwargs):
async def run():
process = WorkerActionListenerProcess(*args, **kwargs)
await process.start()
# Keep the process running
while not process.killing:
while not stop_event.is_set():
await asyncio.sleep(0.1)
await process.exit_gracefully()

asyncio.run(run())
12 changes: 5 additions & 7 deletions hatchet_sdk/worker/runner/run_loop_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import concurrent
import logging
from dataclasses import dataclass, field
from multiprocessing import Queue
Expand All @@ -8,6 +9,7 @@
from hatchet_sdk.clients.dispatcher.action_listener import Action
from hatchet_sdk.loader import ClientConfig
from hatchet_sdk.logger import logger
from hatchet_sdk.utils.aio_utils import get_active_event_loop
from hatchet_sdk.worker.runner.runner import Runner
from hatchet_sdk.worker.runner.utils.capture_logs import capture_logs

Expand Down Expand Up @@ -50,7 +52,7 @@ async def async_start(self, retry_count=1):

async def _async_start(self, retry_count=1):
logger.info("starting runner...")
self.loop = asyncio.get_running_loop()
self.loop = get_active_event_loop()
k = self.loop.create_task(self._start_action_loop())

def cleanup(self):
Expand Down Expand Up @@ -84,7 +86,8 @@ async def _start_action_loop(self):
logger.debug("action runner loop stopped")

async def _get_action(self):
return await self.loop.run_in_executor(None, self.action_queue.get)
with concurrent.futures.ThreadPoolExecutor() as pool:
return await self.loop.run_in_executor(pool, self.action_queue.get)

async def exit_gracefully(self):
if self.killing:
Expand All @@ -94,11 +97,6 @@ async def exit_gracefully(self):

self.cleanup()

# Wait for 1 second to allow last calls to flush. These are calls which have been
# added to the event loop as callbacks to tasks, so we're not aware of them in the
# task list.
await asyncio.sleep(1)

def exit_forcefully(self):
logger.info("forcefully exiting runner...")
self.cleanup()
Loading
Loading