Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open telemetry usage (Proof of Concept) #379

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions diracx-routers/src/diracx/routers/auth/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from cryptography.fernet import Fernet
from fastapi import Depends, HTTPException, status

from ..otel import async_tracer, sync_tracer, set_trace_attribute

from diracx.core.properties import (
SecurityProperty,
UnevaluatedProperty,
Expand Down Expand Up @@ -59,7 +61,7 @@ async def require_property(

_server_metadata_cache: TTLCache = TTLCache(maxsize=1024, ttl=3600)


@async_tracer()
async def get_server_metadata(url: str):
"""Get the server metadata from the IAM."""
server_metadata = _server_metadata_cache.get(url)
Expand All @@ -71,6 +73,8 @@ async def get_server_metadata(url: str):
raise NotImplementedError(res)
server_metadata = res.json()
_server_metadata_cache[url] = server_metadata

set_trace_attribute("meta", server_metadata)
return server_metadata


Expand Down Expand Up @@ -161,7 +165,7 @@ async def verify_dirac_refresh_token(

return (token["jti"], float(token["exp"]), token["legacy_exchange"])


@sync_tracer()
def parse_and_validate_scope(
scope: str, config: Config, available_properties: set[SecurityProperty]
) -> ScopeInfoDict:
Expand Down Expand Up @@ -224,13 +228,18 @@ def parse_and_validate_scope(
f"{set(properties)-set(available_properties)} are not valid properties"
)

set_trace_attribute("group", group)
set_trace_attribute("properties", set(sorted(properties)))
set_trace_attribute("vo", vo)

return {
"group": group,
"properties": set(sorted(properties)),
"vo": vo,
}


@async_tracer()
async def initiate_authorization_flow_with_iam(
config, vo: str, redirect_uri: str, state: dict[str, str], cipher_suite: Fernet
):
Expand Down Expand Up @@ -272,6 +281,12 @@ async def initiate_authorization_flow_with_iam(
f"state={encrypted_state}",
]
authorization_flow_url = f"{authorization_endpoint}?{'&'.join(urlParams)}"

set_trace_attribute("endpoint", authorization_endpoint)
for param in urlParams:
k, v = param.split("=", maxsplit=1)
set_trace_attribute(k, v)

return authorization_flow_url


Expand Down
6 changes: 5 additions & 1 deletion diracx-routers/src/diracx/routers/auth/well_known.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ..dependencies import Config, DevelopmentSettings
from ..fastapi_classes import DiracxRouter
from ..utils.users import AuthSettings
from ..otel import async_tracer, set_trace_attribute

router = DiracxRouter(require_auth=False, path_root="")

Expand All @@ -24,7 +25,7 @@ async def openid_configuration(
scopes_supported += [f"group:{vo}" for vo in config.Registry[vo].Groups]
scopes_supported += [f"property:{p}" for p in settings.available_properties]

return {
res = {
"issuer": settings.token_issuer,
"token_endpoint": str(request.url_for("token")),
"userinfo_endpoint:": str(request.url_for("userinfo")),
Expand All @@ -43,6 +44,9 @@ async def openid_configuration(
"code_challenge_methods_supported": ["S256"],
}

set_trace_attribute("config", res)

return res

class SupportInfo(TypedDict):
message: str
Expand Down
90 changes: 90 additions & 0 deletions diracx-routers/src/diracx/routers/otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,96 @@

from diracx.core.settings import ServiceSettingsBase

from functools import wraps
import inspect

from collections import UserDict
from timeit import default_timer as timer


def async_tracer(name=None):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Obtain the module that contains the decorated function
package_name = get_module_name_from_func(func)

if not name:
tace_name = func.__name__

tracer = trace.get_tracer_provider().get_tracer(package_name)

# Create a span with name: diracx.diracx_xxx.(...).package.function
with tracer.start_as_current_span(f"{package_name}.{tace_name}"):
return await func(*args, **kwargs)

return wrapper
return decorator

def sync_tracer(name=None):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Obtain the module that contains the decorated function
module_name = get_module_name_from_func(func)

if not name:
tace_name = func.__name__

tracer = trace.get_tracer_provider().get_tracer(module_name)

# Create a span with name: diracx.diracx_xxx.(...).package.function
with tracer.start_as_current_span(f"{module_name}.{tace_name}"):
return func(*args, **kwargs)

return wrapper
return decorator

def set_trace_attribute(key, value, stringify=False):
span = trace.get_current_span()
if stringify:
span.set_attribute(f"diracx.{key}", str(value))
else:
_recursive_set_trace_attribute(span, f"diracx.{key}", value)


def _recursive_set_trace_attribute(span, key, value):
if isinstance(value, list):
zeros = len(str(len(value)))
for idx, item in enumerate(value):
_recursive_set_trace_attribute(span, f"{key}[{str(idx).zfill(zeros)}]", item)

elif isinstance(value, set) or isinstance(value, tuple):
zeros = len(str(len(value)))
for idx, item in enumerate(value):
_recursive_set_trace_attribute(span, f"{key}.item_{str(idx).zfill(zeros)}", item)

elif isinstance(value, dict) or isinstance(value, UserDict):
for k, v in value.items():
_recursive_set_trace_attribute(span, f"{key}.{k}", v)

else:
span.set_attribute(key, value)


def increase_counter(meter_name, counter_name, amount=1, is_updown=False):
meter = metrics.get_meter_provider().get_meter(meter_name)

if is_updown:
metric = meter.create_up_down_counter(counter_name)
else:
metric = meter.create_counter(counter_name)

metric.add(amount)

def get_module_name_from_func(func):
from_module = inspect.getmodule(func)

if not from_module:
return "diracx"

module_name = from_module.__name__
return module_name

class OTELSettings(ServiceSettingsBase):
"""Settings for the Open Telemetry Configuration."""
Expand Down