Skip to content

Commit

Permalink
Logic Improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
srhinos committed Aug 13, 2024
1 parent 6647b40 commit f7929a9
Showing 1 changed file with 18 additions and 20 deletions.
38 changes: 18 additions & 20 deletions hatchet_sdk/workflow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ class EventLoopThread:

def __init__(self):
"""
Initializes the EventLoopThread by creating or getting an event loop
Initializes the EventLoopThread by creating an event loop
and setting up a thread to run the loop.
"""
self.loop: asyncio.AbstractEventLoop = self.get_or_create_event_loop()
self.thread: Thread = Thread(target=self.run_loop_in_thread, args=(self.loop,))
self.loop = asyncio.new_event_loop()
self.thread = Thread(target=self.run_loop_in_thread, args=(self.loop,))

def __enter__(self) -> asyncio.AbstractEventLoop:
"""
Expand Down Expand Up @@ -47,20 +47,15 @@ def run_loop_in_thread(self, loop: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
loop.run_forever()

def get_or_create_event_loop(self) -> asyncio.AbstractEventLoop:
"""
Gets the current event loop or creates a new one if none exists.

Returns:
asyncio.AbstractEventLoop: The current or newly created event loop.
"""
try:
return asyncio.get_event_loop()
except RuntimeError as e:
if str(e).startswith("There is no current event loop in thread"):
return asyncio.new_event_loop()
else:
raise e
def get_or_create_event_loop():
try:
return asyncio.get_event_loop()
except RuntimeError as e:
if str(e).startswith("There is no current event loop in thread"):
return None
else:
raise e


class WorkflowRunRef:
Expand All @@ -83,10 +78,13 @@ def stream(self) -> RunEventListener:
return self.workflow_run_event_listener.stream(self.workflow_run_id)

def result(self):
return self.workflow_listener.result(self.workflow_run_id)

def sync_result(self):
with EventLoopThread() as loop:
loop = get_or_create_event_loop()
if loop is None:
with EventLoopThread() as loop:
coro = self.workflow_listener.result(self.workflow_run_id)
future = asyncio.run_coroutine_threadsafe(coro, loop)
return future.result()
else:
coro = self.workflow_listener.result(self.workflow_run_id)
future = asyncio.run_coroutine_threadsafe(coro, loop)
return future.result()
Expand Down

0 comments on commit f7929a9

Please sign in to comment.