Skip to content

Commit

Permalink
feat: add ability to optionally disable internal HTTP send and receiv…
Browse files Browse the repository at this point in the history
…e spans (#2802)
  • Loading branch information
omgitsaheadcrab committed Sep 10, 2024
1 parent 9cced97 commit 02c9561
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 69 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-instrumentation-kafka-python` Instrument temporary fork, kafka-python-ng
inside kafka-python's instrumentation
([#2537](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2537))
- `opentelemetry-instrumentation-asgi`, `opentelemetry-instrumentation-fastapi` Add ability to disable internal HTTP send and receive spans
([#2802](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2802))

### Breaking changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ def get_default_span_details(scope: dict) -> Tuple[str, dict]:


def _collect_target_attribute(
scope: typing.Dict[str, typing.Any]
scope: typing.Dict[str, typing.Any],
) -> typing.Optional[str]:
"""
Returns the target path as defined by the Semantic Conventions.
Expand Down Expand Up @@ -529,6 +529,7 @@ class OpenTelemetryMiddleware:
the current globally configured one is used.
meter_provider: The optional meter provider to use. If omitted
the current globally configured one is used.
exclude_spans: Optionally exclude HTTP `send` and/or `receive` spans from the trace.
"""

# pylint: disable=too-many-branches
Expand All @@ -547,6 +548,7 @@ def __init__(
http_capture_headers_server_request: list[str] | None = None,
http_capture_headers_server_response: list[str] | None = None,
http_capture_headers_sanitize_fields: list[str] | None = None,
exclude_spans: list[typing.Literal["receive", "send"]] | None = None,
):
# initialize semantic conventions opt-in if needed
_OpenTelemetrySemanticConventionStability._initialize()
Expand Down Expand Up @@ -653,6 +655,12 @@ def __init__(
)
or []
)
self.exclude_receive_span = (
"receive" in exclude_spans if exclude_spans else False
)
self.exclude_send_span = (
"send" in exclude_spans if exclude_spans else False
)

# pylint: disable=too-many-statements
async def __call__(
Expand Down Expand Up @@ -796,8 +804,10 @@ async def __call__(
span.end()

# pylint: enable=too-many-branches

def _get_otel_receive(self, server_span_name, scope, receive):
if self.exclude_receive_span:
return receive

@wraps(receive)
async def otel_receive():
with self.tracer.start_as_current_span(
Expand All @@ -821,6 +831,66 @@ async def otel_receive():

return otel_receive

def _set_send_span(
self,
server_span_name,
scope,
send,
message,
status_code,
expecting_trailers,
):
"""Set send span attributes and status code."""
with self.tracer.start_as_current_span(
" ".join((server_span_name, scope["type"], "send"))
) as send_span:
if callable(self.client_response_hook):
self.client_response_hook(send_span, scope, message)

if send_span.is_recording():
if message["type"] == "http.response.start":
expecting_trailers = message.get("trailers", False)
send_span.set_attribute("asgi.event.type", message["type"])

if status_code:
set_status_code(
send_span,
status_code,
None,
self._sem_conv_opt_in_mode,
)
return expecting_trailers

def _set_server_span(
self, server_span, message, status_code, duration_attrs
):
"""Set server span attributes and status code."""
if (
server_span.is_recording()
and server_span.kind == trace.SpanKind.SERVER
and "headers" in message
):
custom_response_attributes = (
collect_custom_headers_attributes(
message,
self.http_capture_headers_sanitize_fields,
self.http_capture_headers_server_response,
normalise_response_header_name,
)
if self.http_capture_headers_server_response
else {}
)
if len(custom_response_attributes) > 0:
server_span.set_attributes(custom_response_attributes)

if status_code:
set_status_code(
server_span,
status_code,
duration_attrs,
self._sem_conv_opt_in_mode,
)

def _get_otel_send(
self,
server_span,
Expand All @@ -834,74 +904,46 @@ def _get_otel_send(
@wraps(send)
async def otel_send(message: dict[str, Any]):
nonlocal expecting_trailers
with self.tracer.start_as_current_span(
" ".join((server_span_name, scope["type"], "send"))
) as send_span:
if callable(self.client_response_hook):
self.client_response_hook(send_span, scope, message)

status_code = None
if message["type"] == "http.response.start":
status_code = message["status"]
elif message["type"] == "websocket.send":
status_code = 200

if send_span.is_recording():
if message["type"] == "http.response.start":
expecting_trailers = message.get("trailers", False)
send_span.set_attribute("asgi.event.type", message["type"])
if (
server_span.is_recording()
and server_span.kind == trace.SpanKind.SERVER
and "headers" in message
):
custom_response_attributes = (
collect_custom_headers_attributes(
message,
self.http_capture_headers_sanitize_fields,
self.http_capture_headers_server_response,
normalise_response_header_name,
)
if self.http_capture_headers_server_response
else {}
)
if len(custom_response_attributes) > 0:
server_span.set_attributes(
custom_response_attributes
)
if status_code:
# We record metrics only once
set_status_code(
server_span,
status_code,
duration_attrs,
self._sem_conv_opt_in_mode,
)
set_status_code(
send_span,
status_code,
None,
self._sem_conv_opt_in_mode,
)
status_code = None
if message["type"] == "http.response.start":
status_code = message["status"]
elif message["type"] == "websocket.send":
status_code = 200

propagator = get_global_response_propagator()
if propagator:
propagator.inject(
message,
context=set_span_in_context(
server_span, trace.context_api.Context()
),
setter=asgi_setter,
)
if not self.exclude_send_span:
expecting_trailers = self._set_send_span(
server_span_name,
scope,
send,
message,
status_code,
expecting_trailers,
)

content_length = asgi_getter.get(message, "content-length")
if content_length:
try:
self.content_length_header = int(content_length[0])
except ValueError:
pass
self._set_server_span(
server_span, message, status_code, duration_attrs
)

propagator = get_global_response_propagator()
if propagator:
propagator.inject(
message,
context=set_span_in_context(
server_span, trace.context_api.Context()
),
setter=asgi_setter,
)

content_length = asgi_getter.get(message, "content-length")
if content_length:
try:
self.content_length_header = int(content_length[0])
except ValueError:
pass

await send(message)

await send(message)
# pylint: disable=too-many-boolean-expressions
if (
not expecting_trailers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,30 @@ async def test_background_execution(self):
_SIMULATED_BACKGROUND_TASK_EXECUTION_TIME_S * 10**9,
)

async def test_exclude_internal_spans(self):
"""Test that internal spans are excluded from the emitted spans when
the `exclude_receive_span` or `exclude_send_span` attributes are set.
"""
cases = [
(["receive", "send"], ["GET / http receive", "GET / http send"]),
(["send"], ["GET / http send"]),
(["receive"], ["GET / http receive"]),
([], []),
]
for exclude_spans, excluded_spans in cases:
self.memory_exporter.clear()
app = otel_asgi.OpenTelemetryMiddleware(
simple_asgi, exclude_spans=exclude_spans
)
self.seed_app(app)
await self.send_default_request()
await self.get_all_output()
span_list = self.memory_exporter.get_finished_spans()
self.assertTrue(span_list)
for span in span_list:
for excluded_span in excluded_spans:
self.assertNotEqual(span.name, excluded_span)

async def test_trailers(self):
"""Test that trailers are emitted as expected and that the server span is ended
BEFORE the background task is finished."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def client_response_hook(span: Span, scope: dict[str, Any], message: dict[str, A
from __future__ import annotations

import logging
from typing import Collection
from typing import Collection, Literal

import fastapi
from starlette.routing import Match
Expand Down Expand Up @@ -222,7 +222,7 @@ class FastAPIInstrumentor(BaseInstrumentor):

@staticmethod
def instrument_app(
app: fastapi.FastAPI,
app,
server_request_hook: ServerRequestHook = None,
client_request_hook: ClientRequestHook = None,
client_response_hook: ClientResponseHook = None,
Expand All @@ -232,8 +232,28 @@ def instrument_app(
http_capture_headers_server_request: list[str] | None = None,
http_capture_headers_server_response: list[str] | None = None,
http_capture_headers_sanitize_fields: list[str] | None = None,
exclude_spans: list[Literal["receive", "send"]] | None = None,
):
"""Instrument an uninstrumented FastAPI application."""
"""Instrument an uninstrumented FastAPI application.
Args:
app: The fastapi ASGI application callable to forward requests to.
server_request_hook: Optional callback which is called with the server span and ASGI
scope object for every incoming request.
client_request_hook: Optional callback which is called with the internal span, and ASGI
scope and event which are sent as dictionaries for when the method receive is called.
client_response_hook: Optional callback which is called with the internal span, and ASGI
scope and event which are sent as dictionaries for when the method send is called.
tracer_provider: The optional tracer provider to use. If omitted
the current globally configured one is used.
meter_provider: The optional meter provider to use. If omitted
the current globally configured one is used.
excluded_urls: Optional comma delimited string of regexes to match URLs that should not be traced.
http_capture_headers_server_request: Optional list of HTTP headers to capture from the request.
http_capture_headers_server_response: Optional list of HTTP headers to capture from the response.
http_capture_headers_sanitize_fields: Optional list of HTTP headers to sanitize.
exclude_spans: Optionally exclude HTTP `send` and/or `receive` spans from the trace.
"""
if not hasattr(app, "_is_instrumented_by_opentelemetry"):
app._is_instrumented_by_opentelemetry = False

Expand Down Expand Up @@ -273,6 +293,7 @@ def instrument_app(
http_capture_headers_server_request=http_capture_headers_server_request,
http_capture_headers_server_response=http_capture_headers_server_response,
http_capture_headers_sanitize_fields=http_capture_headers_sanitize_fields,
exclude_spans=exclude_spans,
)
app._is_instrumented_by_opentelemetry = True
if app not in _InstrumentedFastAPI._instrumented_fastapi_apps:
Expand Down Expand Up @@ -323,6 +344,7 @@ def _instrument(self, **kwargs):
else parse_excluded_urls(_excluded_urls)
)
_InstrumentedFastAPI._meter_provider = kwargs.get("meter_provider")
_InstrumentedFastAPI._exclude_spans = kwargs.get("exclude_spans")
fastapi.FastAPI = _InstrumentedFastAPI

def _uninstrument(self, **kwargs):
Expand Down Expand Up @@ -373,6 +395,7 @@ def __init__(self, *args, **kwargs):
http_capture_headers_server_request=_InstrumentedFastAPI._http_capture_headers_server_request,
http_capture_headers_server_response=_InstrumentedFastAPI._http_capture_headers_server_response,
http_capture_headers_sanitize_fields=_InstrumentedFastAPI._http_capture_headers_sanitize_fields,
exclude_spans=_InstrumentedFastAPI._exclude_spans,
)
self._is_instrumented_by_opentelemetry = True
_InstrumentedFastAPI._instrumented_fastapi_apps.add(self)
Expand Down

0 comments on commit 02c9561

Please sign in to comment.