From 2f9a28b7a95fac3822ead7346de4d42674dc98f0 Mon Sep 17 00:00:00 2001 From: gabriel ruttner Date: Thu, 5 Sep 2024 18:29:24 -0400 Subject: [PATCH 1/3] feat: expose namespace option override --- examples/dag/event.py | 4 ++-- hatchet | 2 +- hatchet_sdk/clients/admin.py | 25 +++++++++++++++---------- hatchet_sdk/clients/events.py | 8 +++++++- pyproject.toml | 2 +- 5 files changed, 26 insertions(+), 15 deletions(-) diff --git a/examples/dag/event.py b/examples/dag/event.py index 96953908..ba6f881e 100644 --- a/examples/dag/event.py +++ b/examples/dag/event.py @@ -6,5 +6,5 @@ hatchet = Hatchet(debug=True) -for i in range(10): - hatchet.event.push("dag:create", {"test": "test"}) +# for i in range(10): +hatchet.event.push("dag:create", {"test": "test"}) diff --git a/hatchet b/hatchet index 33d7ed47..67357cfa 160000 --- a/hatchet +++ b/hatchet @@ -1 +1 @@ -Subproject commit 33d7ed471e316210d8932a6ee96c7b26a5756695 +Subproject commit 67357cfa64979260c925edc6c8c91b016f695450 diff --git a/hatchet_sdk/clients/admin.py b/hatchet_sdk/clients/admin.py index 35d1715b..74476b5e 100644 --- a/hatchet_sdk/clients/admin.py +++ b/hatchet_sdk/clients/admin.py @@ -46,11 +46,7 @@ class ChildTriggerWorkflowOptions(TypedDict): class TriggerWorkflowOptions(ScheduleTriggerWorkflowOptions, TypedDict): additional_metadata: Dict[str, str] | None = None desired_worker_id: str | None = None - - -class TriggerWorkflowOptions(ScheduleTriggerWorkflowOptions, TypedDict): - additional_metadata: Dict[str, str] | None = None - desired_worker_id: str | None = None + namespace: str | None = None class DedupeViolationErr(Exception): @@ -177,9 +173,13 @@ async def run_workflow( if not self.pooled_workflow_listener: self.pooled_workflow_listener = PooledWorkflowRunListener(self.config) - # if workflow_name does not start with namespace, prepend it - if self.namespace != "" and not workflow_name.startswith(self.namespace): - workflow_name = f"{self.namespace}{workflow_name}" + namespace = self.namespace + + if options is not None and "namespace" in options and options["namespace"] is not None: + namespace = options["namespace"] + + if namespace != "" and not workflow_name.startswith(self.namespace): + workflow_name = f"{namespace}{workflow_name}" request = self._prepare_workflow_request(workflow_name, input, options) resp: TriggerWorkflowResponse = await self.aio_client.TriggerWorkflow( @@ -336,8 +336,13 @@ def run_workflow( if not self.pooled_workflow_listener: self.pooled_workflow_listener = PooledWorkflowRunListener(self.config) - if self.namespace != "" and not workflow_name.startswith(self.namespace): - workflow_name = f"{self.namespace}{workflow_name}" + namespace = self.namespace + + if options is not None and "namespace" in options and options["namespace"] is not None: + namespace = options["namespace"] + + if namespace != "" and not workflow_name.startswith(self.namespace): + workflow_name = f"{namespace}{workflow_name}" request = self._prepare_workflow_request(workflow_name, input, options) resp: TriggerWorkflowResponse = self.client.TriggerWorkflow( diff --git a/hatchet_sdk/clients/events.py b/hatchet_sdk/clients/events.py index b4dc1c7d..56a5ccf1 100644 --- a/hatchet_sdk/clients/events.py +++ b/hatchet_sdk/clients/events.py @@ -35,6 +35,7 @@ def proto_timestamp_now(): class PushEventOptions(TypedDict): additional_metadata: Dict[str, str] | None = None + namespace: str | None = None class EventClient: @@ -46,7 +47,12 @@ def __init__(self, client: EventsServiceStub, config: ClientConfig): @tenacity_retry def push(self, event_key, payload, options: PushEventOptions = None) -> Event: - namespaced_event_key = self.namespace + event_key + namespace = self.namespace + + if options is not None and "namespace" in options and options["namespace"] is not None: + namespace = options["namespace"] + + namespaced_event_key = namespace + event_key try: meta = None if options is None else options["additional_metadata"] diff --git a/pyproject.toml b/pyproject.toml index dfe70318..52036e38 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "hatchet-sdk" -version = "0.36.15" +version = "0.36.16" description = "" authors = ["Alexander Belanger "] readme = "README.md" From 1aa0c2b8770777dc67466253f7f55a5dccc52646 Mon Sep 17 00:00:00 2001 From: gabriel ruttner Date: Thu, 5 Sep 2024 18:30:59 -0400 Subject: [PATCH 2/3] lint --- hatchet_sdk/clients/admin.py | 12 ++++++++++-- hatchet_sdk/clients/events.py | 6 +++++- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/hatchet_sdk/clients/admin.py b/hatchet_sdk/clients/admin.py index 74476b5e..f03f5da2 100644 --- a/hatchet_sdk/clients/admin.py +++ b/hatchet_sdk/clients/admin.py @@ -175,7 +175,11 @@ async def run_workflow( namespace = self.namespace - if options is not None and "namespace" in options and options["namespace"] is not None: + if ( + options is not None + and "namespace" in options + and options["namespace"] is not None + ): namespace = options["namespace"] if namespace != "" and not workflow_name.startswith(self.namespace): @@ -338,7 +342,11 @@ def run_workflow( namespace = self.namespace - if options is not None and "namespace" in options and options["namespace"] is not None: + if ( + options is not None + and "namespace" in options + and options["namespace"] is not None + ): namespace = options["namespace"] if namespace != "" and not workflow_name.startswith(self.namespace): diff --git a/hatchet_sdk/clients/events.py b/hatchet_sdk/clients/events.py index 56a5ccf1..6ca74153 100644 --- a/hatchet_sdk/clients/events.py +++ b/hatchet_sdk/clients/events.py @@ -49,7 +49,11 @@ def push(self, event_key, payload, options: PushEventOptions = None) -> Event: namespace = self.namespace - if options is not None and "namespace" in options and options["namespace"] is not None: + if ( + options is not None + and "namespace" in options + and options["namespace"] is not None + ): namespace = options["namespace"] namespaced_event_key = namespace + event_key From 56f00e0bff31bdf09bbb4a4a58f30d938335b1e8 Mon Sep 17 00:00:00 2001 From: gabriel ruttner Date: Tue, 10 Sep 2024 12:04:13 -0400 Subject: [PATCH 3/3] chore: lint --- hatchet_sdk/loader.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hatchet_sdk/loader.py b/hatchet_sdk/loader.py index 51905b18..06cdb985 100644 --- a/hatchet_sdk/loader.py +++ b/hatchet_sdk/loader.py @@ -103,10 +103,12 @@ def get_config_value(key, env_var): server_url: str | None = None grpc_max_recv_message_length = get_config_value( - "grpc_max_recv_message_length", "HATCHET_CLIENT_GRPC_MAX_RECV_MESSAGE_LENGTH" + "grpc_max_recv_message_length", + "HATCHET_CLIENT_GRPC_MAX_RECV_MESSAGE_LENGTH", ) grpc_max_send_message_length = get_config_value( - "grpc_max_send_message_length", "HATCHET_CLIENT_GRPC_MAX_SEND_MESSAGE_LENGTH" + "grpc_max_send_message_length", + "HATCHET_CLIENT_GRPC_MAX_SEND_MESSAGE_LENGTH", ) if not host_port: