From f7929a9aaf05d0d4656f680e6fafd043601e1a68 Mon Sep 17 00:00:00 2001 From: srhinos <6531393+srhinos@users.noreply.github.com> Date: Tue, 13 Aug 2024 14:56:09 -0400 Subject: [PATCH] Logic Improvement --- hatchet_sdk/workflow_run.py | 38 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/hatchet_sdk/workflow_run.py b/hatchet_sdk/workflow_run.py index 6989132d..9fae5fa5 100644 --- a/hatchet_sdk/workflow_run.py +++ b/hatchet_sdk/workflow_run.py @@ -14,11 +14,11 @@ class EventLoopThread: def __init__(self): """ - Initializes the EventLoopThread by creating or getting an event loop + Initializes the EventLoopThread by creating an event loop and setting up a thread to run the loop. """ - self.loop: asyncio.AbstractEventLoop = self.get_or_create_event_loop() - self.thread: Thread = Thread(target=self.run_loop_in_thread, args=(self.loop,)) + self.loop = asyncio.new_event_loop() + self.thread = Thread(target=self.run_loop_in_thread, args=(self.loop,)) def __enter__(self) -> asyncio.AbstractEventLoop: """ @@ -47,20 +47,15 @@ def run_loop_in_thread(self, loop: asyncio.AbstractEventLoop) -> None: asyncio.set_event_loop(loop) loop.run_forever() - def get_or_create_event_loop(self) -> asyncio.AbstractEventLoop: - """ - Gets the current event loop or creates a new one if none exists. - Returns: - asyncio.AbstractEventLoop: The current or newly created event loop. - """ - try: - return asyncio.get_event_loop() - except RuntimeError as e: - if str(e).startswith("There is no current event loop in thread"): - return asyncio.new_event_loop() - else: - raise e +def get_or_create_event_loop(): + try: + return asyncio.get_event_loop() + except RuntimeError as e: + if str(e).startswith("There is no current event loop in thread"): + return None + else: + raise e class WorkflowRunRef: @@ -83,10 +78,13 @@ def stream(self) -> RunEventListener: return self.workflow_run_event_listener.stream(self.workflow_run_id) def result(self): - return self.workflow_listener.result(self.workflow_run_id) - - def sync_result(self): - with EventLoopThread() as loop: + loop = get_or_create_event_loop() + if loop is None: + with EventLoopThread() as loop: + coro = self.workflow_listener.result(self.workflow_run_id) + future = asyncio.run_coroutine_threadsafe(coro, loop) + return future.result() + else: coro = self.workflow_listener.result(self.workflow_run_id) future = asyncio.run_coroutine_threadsafe(coro, loop) return future.result()