Skip to content

Commit a8876bf

Browse files
authored
Merge pull request #140 from GetStream/otel
Adds tracing and metrics with otel
2 parents 68ec2e7 + 7e64a16 commit a8876bf

24 files changed

+2038
-1410
lines changed

getstream/base.py

Lines changed: 180 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import time
23
from typing import Any, Dict, Optional, Type, get_origin
34

45
from getstream.models import APIError
@@ -9,6 +10,13 @@
910
from getstream.config import BaseConfig
1011
from urllib.parse import quote
1112
from abc import ABC
13+
from getstream.common.telemetry import (
14+
common_attributes,
15+
record_metrics,
16+
span_request,
17+
current_operation,
18+
metric_attributes,
19+
)
1220

1321

1422
def build_path(path: str, path_params: dict) -> str:
@@ -46,7 +54,38 @@ def _parse_response(
4654
return StreamResponse(response, data)
4755

4856

49-
class BaseClient(BaseConfig, ResponseParserMixin, ABC):
57+
class TelemetryEndpointMixin:
58+
def _normalize_endpoint_from_path(self, path: str) -> str:
59+
# Convert /api/v2/video/call/{type}/{id} -> api.v2.video.call.$type.$id
60+
norm_parts = []
61+
for p in path.strip("/").split("/"):
62+
if not p:
63+
continue
64+
if p.startswith("{") and p.endswith("}"):
65+
name = p[1:-1].strip()
66+
if name:
67+
norm_parts.append(f"${name}")
68+
else:
69+
norm_parts.append(p)
70+
return ".".join(norm_parts) if norm_parts else "root"
71+
72+
def _prepare_request(self, method: str, path: str, query_params, kwargs):
73+
path_params = kwargs.get("path_params") if kwargs else None
74+
url_path = (
75+
build_path(path, path_params) if path_params else build_path(path, None)
76+
)
77+
url_full = f"{self.base_url}{url_path}"
78+
endpoint = self._endpoint_name(path)
79+
span_attrs = common_attributes(
80+
api_key=self.api_key,
81+
endpoint=endpoint,
82+
method=method,
83+
url=url_full,
84+
)
85+
return url_path, url_full, endpoint, span_attrs
86+
87+
88+
class BaseClient(TelemetryEndpointMixin, BaseConfig, ResponseParserMixin, ABC):
5089
def __init__(
5190
self,
5291
api_key,
@@ -73,6 +112,46 @@ def __enter__(self):
73112
def __exit__(self, exc_type, exc_val, exc_tb):
74113
self.close()
75114

115+
def _endpoint_name(self, path: str) -> str:
116+
op = current_operation(None)
117+
if op:
118+
return op
119+
return self._normalize_endpoint_from_path(path)
120+
121+
def _request_sync(
122+
self, method: str, path: str, *, query_params=None, args=(), kwargs=None
123+
):
124+
kwargs = kwargs or {}
125+
url_path, url_full, endpoint, attrs = self._prepare_request(
126+
method, path, query_params, kwargs
127+
)
128+
start = time.perf_counter()
129+
# Span name uses logical operation (endpoint) rather than raw HTTP
130+
with span_request(
131+
endpoint, attributes=attrs, request_body=kwargs.get("json")
132+
) as span:
133+
call_kwargs = dict(kwargs)
134+
call_kwargs.pop("path_params", None)
135+
response = getattr(self.client, method.lower())(
136+
url_path, params=query_params, *args, **call_kwargs
137+
)
138+
try:
139+
span and span.set_attribute(
140+
"http.response.status_code", response.status_code
141+
)
142+
except Exception:
143+
pass
144+
duration_ms = (time.perf_counter() - start) * 1000.0
145+
# Metrics should be low-cardinality: exclude url/call_cid/channel_cid
146+
metric_attrs = metric_attributes(
147+
api_key=self.api_key,
148+
endpoint=endpoint,
149+
method=method,
150+
status_code=getattr(response, "status_code", None),
151+
)
152+
record_metrics(duration_ms, attributes=metric_attrs)
153+
return response
154+
76155
def patch(
77156
self,
78157
path,
@@ -82,8 +161,12 @@ def patch(
82161
*args,
83162
**kwargs,
84163
) -> StreamResponse[T]:
85-
response = self.client.patch(
86-
build_path(path, path_params), params=query_params, *args, **kwargs
164+
response = self._request_sync(
165+
"PATCH",
166+
path,
167+
query_params=query_params,
168+
args=args,
169+
kwargs=kwargs | {"path_params": path_params},
87170
)
88171
return self._parse_response(response, data_type or Dict[str, Any])
89172

@@ -96,8 +179,12 @@ def get(
96179
*args,
97180
**kwargs,
98181
) -> StreamResponse[T]:
99-
response = self.client.get(
100-
build_path(path, path_params), params=query_params, *args, **kwargs
182+
response = self._request_sync(
183+
"GET",
184+
path,
185+
query_params=query_params,
186+
args=args,
187+
kwargs=kwargs | {"path_params": path_params},
101188
)
102189
return self._parse_response(response, data_type or Dict[str, Any])
103190

@@ -110,10 +197,13 @@ def post(
110197
*args,
111198
**kwargs,
112199
) -> StreamResponse[T]:
113-
response = self.client.post(
114-
build_path(path, path_params), params=query_params, *args, **kwargs
200+
response = self._request_sync(
201+
"POST",
202+
path,
203+
query_params=query_params,
204+
args=args,
205+
kwargs=kwargs | {"path_params": path_params},
115206
)
116-
117207
return self._parse_response(response, data_type or Dict[str, Any])
118208

119209
def put(
@@ -125,8 +215,12 @@ def put(
125215
*args,
126216
**kwargs,
127217
) -> StreamResponse[T]:
128-
response = self.client.put(
129-
build_path(path, path_params), params=query_params, *args, **kwargs
218+
response = self._request_sync(
219+
"PUT",
220+
path,
221+
query_params=query_params,
222+
args=args,
223+
kwargs=kwargs | {"path_params": path_params},
130224
)
131225
return self._parse_response(response, data_type or Dict[str, Any])
132226

@@ -139,8 +233,12 @@ def delete(
139233
*args,
140234
**kwargs,
141235
) -> StreamResponse[T]:
142-
response = self.client.delete(
143-
build_path(path, path_params), params=query_params, *args, **kwargs
236+
response = self._request_sync(
237+
"DELETE",
238+
path,
239+
query_params=query_params,
240+
args=args,
241+
kwargs=kwargs | {"path_params": path_params},
144242
)
145243
return self._parse_response(response, data_type or Dict[str, Any])
146244

@@ -151,7 +249,7 @@ def close(self):
151249
self.client.close()
152250

153251

154-
class AsyncBaseClient(BaseConfig, ResponseParserMixin, ABC):
252+
class AsyncBaseClient(TelemetryEndpointMixin, BaseConfig, ResponseParserMixin, ABC):
155253
def __init__(
156254
self,
157255
api_key,
@@ -182,6 +280,45 @@ async def aclose(self):
182280
"""Close HTTPX async client (closes pools/keep-alives)."""
183281
await self.client.aclose()
184282

283+
def _endpoint_name(self, path: str) -> str:
284+
op = current_operation(None)
285+
if op:
286+
return op
287+
return self._normalize_endpoint_from_path(path)
288+
289+
async def _request_async(
290+
self, method: str, path: str, *, query_params=None, args=(), kwargs=None
291+
):
292+
kwargs = kwargs or {}
293+
url_path, url_full, endpoint, attrs = self._prepare_request(
294+
method, path, query_params, kwargs
295+
)
296+
start = time.perf_counter()
297+
with span_request(
298+
endpoint, attributes=attrs, request_body=kwargs.get("json")
299+
) as span:
300+
call_kwargs = dict(kwargs)
301+
call_kwargs.pop("path_params", None)
302+
response = await getattr(self.client, method.lower())(
303+
url_path, params=query_params, *args, **call_kwargs
304+
)
305+
try:
306+
span and span.set_attribute(
307+
"http.response.status_code", response.status_code
308+
)
309+
except Exception:
310+
pass
311+
duration_ms = (time.perf_counter() - start) * 1000.0
312+
# Metrics should be low-cardinality: exclude url/call_cid/channel_cid
313+
metric_attrs = metric_attributes(
314+
api_key=self.api_key,
315+
endpoint=endpoint,
316+
method=method,
317+
status_code=getattr(response, "status_code", None),
318+
)
319+
record_metrics(duration_ms, attributes=metric_attrs)
320+
return response
321+
185322
async def patch(
186323
self,
187324
path,
@@ -191,8 +328,12 @@ async def patch(
191328
*args,
192329
**kwargs,
193330
) -> StreamResponse[T]:
194-
response = await self.client.patch(
195-
build_path(path, path_params), params=query_params, *args, **kwargs
331+
response = await self._request_async(
332+
"PATCH",
333+
path,
334+
query_params=query_params,
335+
args=args,
336+
kwargs=kwargs | {"path_params": path_params},
196337
)
197338
return self._parse_response(response, data_type or Dict[str, Any])
198339

@@ -205,8 +346,12 @@ async def get(
205346
*args,
206347
**kwargs,
207348
) -> StreamResponse[T]:
208-
response = await self.client.get(
209-
build_path(path, path_params), params=query_params, *args, **kwargs
349+
response = await self._request_async(
350+
"GET",
351+
path,
352+
query_params=query_params,
353+
args=args,
354+
kwargs=kwargs | {"path_params": path_params},
210355
)
211356
return self._parse_response(response, data_type or Dict[str, Any])
212357

@@ -219,10 +364,13 @@ async def post(
219364
*args,
220365
**kwargs,
221366
) -> StreamResponse[T]:
222-
response = await self.client.post(
223-
build_path(path, path_params), params=query_params, *args, **kwargs
367+
response = await self._request_async(
368+
"POST",
369+
path,
370+
query_params=query_params,
371+
args=args,
372+
kwargs=kwargs | {"path_params": path_params},
224373
)
225-
226374
return self._parse_response(response, data_type or Dict[str, Any])
227375

228376
async def put(
@@ -234,8 +382,12 @@ async def put(
234382
*args,
235383
**kwargs,
236384
) -> StreamResponse[T]:
237-
response = await self.client.put(
238-
build_path(path, path_params), params=query_params, *args, **kwargs
385+
response = await self._request_async(
386+
"PUT",
387+
path,
388+
query_params=query_params,
389+
args=args,
390+
kwargs=kwargs | {"path_params": path_params},
239391
)
240392
return self._parse_response(response, data_type or Dict[str, Any])
241393

@@ -248,8 +400,12 @@ async def delete(
248400
*args,
249401
**kwargs,
250402
) -> StreamResponse[T]:
251-
response = await self.client.delete(
252-
build_path(path, path_params), params=query_params, *args, **kwargs
403+
response = await self._request_async(
404+
"DELETE",
405+
path,
406+
query_params=query_params,
407+
args=args,
408+
kwargs=kwargs | {"path_params": path_params},
253409
)
254410
return self._parse_response(response, data_type or Dict[str, Any])
255411

0 commit comments

Comments
 (0)