Skip to content

Commit

Permalink
Merge pull request #31 from hatchet-dev/belanger/fix-admin-aio
Browse files Browse the repository at this point in the history
fix: admin aio client config
  • Loading branch information
abelanger5 authored May 31, 2024
2 parents c1aa845 + 474b581 commit e3c2f96
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 12 deletions.
1 change: 1 addition & 0 deletions hatchet_sdk/clients/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def _prepare_schedule_workflow_request(
class AdminClientAioImpl(AdminClientBase):
def __init__(self, config: ClientConfig):
aio_conn = new_conn(config, True)
self.config = config
self.aio_client = WorkflowServiceStub(aio_conn)
self.token = config.token
self.listener_client = new_listener(config)
Expand Down
18 changes: 9 additions & 9 deletions hatchet_sdk/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,23 @@ def __init__(
admin_client: AdminClientImpl,
event_client: EventClientImpl,
workflow_listener: PooledWorkflowRunListener,
workflow_run_event_listener: RunEventListenerClient,
namespace: str = "",
):
self.action = action
self.dispatcher_client = dispatcher_client
self.admin_client = admin_client
self.event_client = event_client
self.workflow_listener = workflow_listener
self.workflow_run_event_listener = workflow_run_event_listener
self.namespace = namespace
self.spawn_index = -1

async def spawn_workflow(
self, workflow_name: str, input: dict = {}, key: str = None
):
) -> WorkflowRunRef:
options = self._prepare_workflow_options(key)
child_workflow_run_id = await self.admin_client.aio.run_workflow(
workflow_name, input, options
)
return WorkflowRunRef(child_workflow_run_id, self.workflow_listener)
return await self.admin_client.aio.run_workflow(workflow_name, input, options)


class Context(BaseContext):
Expand All @@ -78,6 +77,7 @@ def __init__(
admin_client: AdminClientImpl,
event_client: EventClientImpl,
workflow_listener: PooledWorkflowRunListener,
workflow_run_event_listener: RunEventListenerClient,
namespace: str = "",
):
self.aio = ContextAioImpl(
Expand All @@ -86,6 +86,7 @@ def __init__(
admin_client,
event_client,
workflow_listener,
workflow_run_event_listener,
namespace,
)

Expand All @@ -110,6 +111,7 @@ def __init__(
self.admin_client = admin_client
self.event_client = event_client
self.workflow_listener = workflow_listener
self.workflow_run_event_listener = workflow_run_event_listener
self.namespace = namespace

# FIXME: this limits the number of concurrent log requests to 1, which means we can do about
Expand Down Expand Up @@ -171,10 +173,8 @@ def spawn_workflow(self, workflow_name: str, input: dict = {}, key: str = None):
workflow_name = f"{self.namespace}{workflow_name}"

options = self._prepare_workflow_options(key)
child_workflow_run_id = self.admin_client.run_workflow(
workflow_name, input, options
)
return WorkflowRunRef(child_workflow_run_id, self.workflow_listener)

return self.admin_client.run_workflow(workflow_name, input, options)

def _log(self, line: str):
try:
Expand Down
4 changes: 4 additions & 0 deletions hatchet_sdk/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from google.protobuf.timestamp_pb2 import Timestamp

from hatchet_sdk.clients.admin import new_admin
from hatchet_sdk.clients.run_event_listener import new_listener
from hatchet_sdk.clients.workflow_listener import PooledWorkflowRunListener
from hatchet_sdk.loader import ClientConfig

Expand Down Expand Up @@ -153,6 +154,7 @@ async def handle_start_step_run(self, action: Action):
self.admin_client,
self.client.event,
self.client.workflow_listener,
self.workflow_run_event_listener,
self.client.config.namespace,
)
self.contexts[action.step_run_id] = context
Expand Down Expand Up @@ -193,6 +195,7 @@ async def handle_start_group_key_run(self, action: Action):
self.admin_client,
self.client.event,
self.client.workflow_listener,
self.workflow_run_event_listener,
self.client.config.namespace,
)

Expand Down Expand Up @@ -482,6 +485,7 @@ async def async_start(self, retry_count=1):
# otherwise the grpc.aio methods will use a different event loop and we'll get a bunch of errors.
self.dispatcher_client = new_dispatcher(self.config)
self.admin_client = new_admin(self.config)
self.workflow_run_event_listener = new_listener(self.config)
self.client.workflow_listener = PooledWorkflowRunListener(self.config)

self.listener: ActionListenerImpl = (
Expand Down
4 changes: 2 additions & 2 deletions hatchet_sdk/workflow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ def __str__(self):
def stream(self) -> RunEventListener:
return self.workflow_run_event_listener.stream(self.workflow_run_id)

async def result(self):
return await self.workflow_listener.result(self.workflow_run_id)
def result(self):
return self.workflow_listener.result(self.workflow_run_id)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "hatchet-sdk"
version = "0.25.1"
version = "0.25.2"
description = ""
authors = ["Alexander Belanger <[email protected]>"]
readme = "README.md"
Expand Down

0 comments on commit e3c2f96

Please sign in to comment.