Skip to content

Commit

Permalink
Adds OpenTelemetry Support (#262)
Browse files Browse the repository at this point in the history
Adding OpenTelemetry support to the Python SDK via some environment variables. Key features:

If you specify the environment variables, traces and spans will be sent to your collector automatically
If the environment variables are unset, the tracer is a no-op, and nothing will happen from the user's PoV
Adds a couple of basic spans allowing the user to track jobs being submitted and run, time their execution, etc.
Uses the OTel carrier to correctly set children runs as being children of their parents
Allows the client to pass their own OTel traceparent to correlate spans with other parts of their application
  • Loading branch information
hatchet-temporary authored Nov 26, 2024
1 parent 22da1ec commit b4130b1
Show file tree
Hide file tree
Showing 9 changed files with 987 additions and 227 deletions.
188 changes: 129 additions & 59 deletions hatchet_sdk/clients/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@
WorkflowVersion,
)
from hatchet_sdk.contracts.workflows_pb2_grpc import WorkflowServiceStub
from hatchet_sdk.utils.serialization import flatten
from hatchet_sdk.utils.tracing import (
create_carrier,
create_tracer,
inject_carrier_into_metadata,
parse_carrier_from_metadata,
)
from hatchet_sdk.workflow_run import RunRef, WorkflowRunRef

from ..loader import ClientConfig
Expand Down Expand Up @@ -164,6 +171,7 @@ def __init__(self, config: ClientConfig):
self.token = config.token
self.listener_client = new_listener(config)
self.namespace = config.namespace
self.otel_tracer = create_tracer(config=config)

async def run(
self,
Expand All @@ -186,44 +194,74 @@ async def run(
async def run_workflow(
self, workflow_name: str, input: any, options: TriggerWorkflowOptions = None
) -> WorkflowRunRef:
try:
if not self.pooled_workflow_listener:
self.pooled_workflow_listener = PooledWorkflowRunListener(self.config)
ctx = parse_carrier_from_metadata(
(options or {}).get("additional_metadata", {})
)

namespace = self.namespace
with self.otel_tracer.start_as_current_span(
f"hatchet.async_run_workflow.{workflow_name}", context=ctx
) as span:
carrier = create_carrier()

if (
options is not None
and "namespace" in options
and options["namespace"] is not None
):
namespace = options["namespace"]
del options["namespace"]
try:
if not self.pooled_workflow_listener:
self.pooled_workflow_listener = PooledWorkflowRunListener(
self.config
)

if namespace != "" and not workflow_name.startswith(self.namespace):
workflow_name = f"{namespace}{workflow_name}"
namespace = self.namespace

request = self._prepare_workflow_request(workflow_name, input, options)
resp: TriggerWorkflowResponse = await self.aio_client.TriggerWorkflow(
request,
metadata=get_metadata(self.token),
)
return WorkflowRunRef(
workflow_run_id=resp.workflow_run_id,
workflow_listener=self.pooled_workflow_listener,
workflow_run_event_listener=self.listener_client,
)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.ALREADY_EXISTS:
raise DedupeViolationErr(e.details())
if (
options is not None
and "namespace" in options
and options["namespace"] is not None
):
namespace = options.pop("namespace")

raise ValueError(f"gRPC error: {e}")
if namespace != "" and not workflow_name.startswith(self.namespace):
workflow_name = f"{namespace}{workflow_name}"

if options is not None and "additional_metadata" in options:
options["additional_metadata"] = inject_carrier_into_metadata(
options["additional_metadata"], carrier
)
span.set_attributes(
flatten(
options["additional_metadata"], parent_key="", separator="."
)
)

request = self._prepare_workflow_request(workflow_name, input, options)

span.add_event(
"Triggering workflow", attributes={"workflow_name": workflow_name}
)

resp: TriggerWorkflowResponse = await self.aio_client.TriggerWorkflow(
request,
metadata=get_metadata(self.token),
)

span.add_event(
"Received workflow response",
attributes={"workflow_name": workflow_name},
)

return WorkflowRunRef(
workflow_run_id=resp.workflow_run_id,
workflow_listener=self.pooled_workflow_listener,
workflow_run_event_listener=self.listener_client,
)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.ALREADY_EXISTS:
raise DedupeViolationErr(e.details())

raise ValueError(f"gRPC error: {e}")

@tenacity_retry
async def run_workflows(
self, workflows: List[WorkflowRunDict], options: TriggerWorkflowOptions = None
) -> List[WorkflowRunRef]:

if len(workflows) == 0:
raise ValueError("No workflows to run")
try:
Expand All @@ -243,7 +281,6 @@ async def run_workflows(
workflow_run_requests: TriggerWorkflowRequest = []

for workflow in workflows:

workflow_name = workflow["workflow_name"]
input_data = workflow["input"]
options = workflow["options"]
Expand Down Expand Up @@ -360,6 +397,7 @@ def __init__(self, config: ClientConfig):
self.token = config.token
self.listener_client = new_listener(config)
self.namespace = config.namespace
self.otel_tracer = create_tracer(config=config)

@tenacity_retry
def put_workflow(
Expand Down Expand Up @@ -408,7 +446,6 @@ def schedule_workflow(
options: ScheduleTriggerWorkflowOptions = None,
) -> WorkflowVersion:
try:

namespace = self.namespace

if (
Expand Down Expand Up @@ -436,55 +473,88 @@ def schedule_workflow(

raise ValueError(f"gRPC error: {e}")

## TODO: `options` is treated as a dict (wrong type hint)
## TODO: `any` type hint should come from `typing`
@tenacity_retry
def run_workflow(
self, workflow_name: str, input: any, options: TriggerWorkflowOptions = None
) -> WorkflowRunRef:
try:
if not self.pooled_workflow_listener:
self.pooled_workflow_listener = PooledWorkflowRunListener(self.config)
ctx = parse_carrier_from_metadata(
(options or {}).get("additional_metadata", {})
)

namespace = self.namespace
with self.otel_tracer.start_as_current_span(
f"hatchet.run_workflow.{workflow_name}", context=ctx
) as span:
carrier = create_carrier()

if (
options is not None
and "namespace" in options
and options["namespace"] is not None
):
namespace = options["namespace"]
del options["namespace"]
try:
if not self.pooled_workflow_listener:
self.pooled_workflow_listener = PooledWorkflowRunListener(
self.config
)

if namespace != "" and not workflow_name.startswith(self.namespace):
workflow_name = f"{namespace}{workflow_name}"
namespace = self.namespace

request = self._prepare_workflow_request(workflow_name, input, options)
resp: TriggerWorkflowResponse = self.client.TriggerWorkflow(
request,
metadata=get_metadata(self.token),
)
return WorkflowRunRef(
workflow_run_id=resp.workflow_run_id,
workflow_listener=self.pooled_workflow_listener,
workflow_run_event_listener=self.listener_client,
)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.ALREADY_EXISTS:
raise DedupeViolationErr(e.details())
## TODO: Factor this out - it's repeated a lot of places
if (
options is not None
and "namespace" in options
and options["namespace"] is not None
):
namespace = options.pop("namespace")

raise ValueError(f"gRPC error: {e}")
if options is not None and "additional_metadata" in options:
options["additional_metadata"] = inject_carrier_into_metadata(
options["additional_metadata"], carrier
)

span.set_attributes(
flatten(
options["additional_metadata"], parent_key="", separator="."
)
)

if namespace != "" and not workflow_name.startswith(self.namespace):
workflow_name = f"{namespace}{workflow_name}"

request = self._prepare_workflow_request(workflow_name, input, options)

span.add_event(
"Triggering workflow", attributes={"workflow_name": workflow_name}
)

resp: TriggerWorkflowResponse = self.client.TriggerWorkflow(
request,
metadata=get_metadata(self.token),
)

span.add_event(
"Received workflow response",
attributes={"workflow_name": workflow_name},
)

return WorkflowRunRef(
workflow_run_id=resp.workflow_run_id,
workflow_listener=self.pooled_workflow_listener,
workflow_run_event_listener=self.listener_client,
)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.ALREADY_EXISTS:
raise DedupeViolationErr(e.details())

raise ValueError(f"gRPC error: {e}")

@tenacity_retry
def run_workflows(
self, workflows: List[WorkflowRunDict], options: TriggerWorkflowOptions = None
) -> list[WorkflowRunRef]:

workflow_run_requests: TriggerWorkflowRequest = []
try:
if not self.pooled_workflow_listener:
self.pooled_workflow_listener = PooledWorkflowRunListener(self.config)

for workflow in workflows:

workflow_name = workflow["workflow_name"]
input_data = workflow["input"]
options = workflow["options"]
Expand Down
27 changes: 25 additions & 2 deletions hatchet_sdk/clients/dispatcher/action_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import time
from dataclasses import dataclass, field
from typing import AsyncGenerator, List, Optional
from typing import Any, AsyncGenerator, List, Optional

import grpc
from grpc._cython import cygrpc
Expand All @@ -23,6 +23,7 @@
from hatchet_sdk.contracts.dispatcher_pb2_grpc import DispatcherStub
from hatchet_sdk.logger import logger
from hatchet_sdk.utils.backoff import exp_backoff_sleep
from hatchet_sdk.utils.serialization import flatten

from ...loader import ClientConfig
from ...metadata import get_metadata
Expand Down Expand Up @@ -88,6 +89,29 @@ def __post_init__(self):
if not isinstance(self.additional_metadata, dict):
self.additional_metadata = {}

@property
def otel_attributes(self) -> dict[str, Any]:
return flatten(
xs={
"worker_id": self.worker_id,
"tenant_id": self.tenant_id,
"workflow_run_id": self.workflow_run_id,
"get_group_key_run_id": self.get_group_key_run_id,
"job_id": self.job_id,
"job_name": self.job_name,
"job_run_id": self.job_run_id,
"step_id": self.step_id,
"step_run_id": self.step_run_id,
"retry_count": self.retry_count,
"child_workflow_index": self.child_workflow_index,
"child_workflow_key": self.child_workflow_key,
"parent_workflow_run_id": self.parent_workflow_run_id,
"action_payload": self.action_payload,
},
parent_key="",
separator=".",
)


START_STEP_RUN = 0
CANCEL_STEP_RUN = 1
Expand Down Expand Up @@ -202,7 +226,6 @@ async def _generator(self) -> AsyncGenerator[Action, None]:
listener = None

while not self.stop_signal:

if listener is not None:
listener.cancel()

Expand Down
Loading

0 comments on commit b4130b1

Please sign in to comment.