diff --git a/CHANGELOG.md b/CHANGELOG.md index 003f8054f6..d091ddeb9b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-kafka-python` Instrument temporary fork, kafka-python-ng inside kafka-python's instrumentation ([#2537](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2537)) +- `opentelemetry-instrumentation-asgi`, `opentelemetry-instrumentation-fastapi` Add ability to disable internal HTTP send and receive spans + ([#2802](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2802)) ### Breaking changes diff --git a/instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py b/instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py index 420fd512d6..d25ca41017 100644 --- a/instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py @@ -483,7 +483,7 @@ def get_default_span_details(scope: dict) -> Tuple[str, dict]: def _collect_target_attribute( - scope: typing.Dict[str, typing.Any] + scope: typing.Dict[str, typing.Any], ) -> typing.Optional[str]: """ Returns the target path as defined by the Semantic Conventions. @@ -529,6 +529,7 @@ class OpenTelemetryMiddleware: the current globally configured one is used. meter_provider: The optional meter provider to use. If omitted the current globally configured one is used. + exclude_spans: Optionally exclude HTTP `send` and/or `receive` spans from the trace. """ # pylint: disable=too-many-branches @@ -547,6 +548,7 @@ def __init__( http_capture_headers_server_request: list[str] | None = None, http_capture_headers_server_response: list[str] | None = None, http_capture_headers_sanitize_fields: list[str] | None = None, + exclude_spans: list[typing.Literal["receive", "send"]] | None = None, ): # initialize semantic conventions opt-in if needed _OpenTelemetrySemanticConventionStability._initialize() @@ -653,6 +655,12 @@ def __init__( ) or [] ) + self.exclude_receive_span = ( + "receive" in exclude_spans if exclude_spans else False + ) + self.exclude_send_span = ( + "send" in exclude_spans if exclude_spans else False + ) # pylint: disable=too-many-statements async def __call__( @@ -796,8 +804,10 @@ async def __call__( span.end() # pylint: enable=too-many-branches - def _get_otel_receive(self, server_span_name, scope, receive): + if self.exclude_receive_span: + return receive + @wraps(receive) async def otel_receive(): with self.tracer.start_as_current_span( @@ -821,6 +831,66 @@ async def otel_receive(): return otel_receive + def _set_send_span( + self, + server_span_name, + scope, + send, + message, + status_code, + expecting_trailers, + ): + """Set send span attributes and status code.""" + with self.tracer.start_as_current_span( + " ".join((server_span_name, scope["type"], "send")) + ) as send_span: + if callable(self.client_response_hook): + self.client_response_hook(send_span, scope, message) + + if send_span.is_recording(): + if message["type"] == "http.response.start": + expecting_trailers = message.get("trailers", False) + send_span.set_attribute("asgi.event.type", message["type"]) + + if status_code: + set_status_code( + send_span, + status_code, + None, + self._sem_conv_opt_in_mode, + ) + return expecting_trailers + + def _set_server_span( + self, server_span, message, status_code, duration_attrs + ): + """Set server span attributes and status code.""" + if ( + server_span.is_recording() + and server_span.kind == trace.SpanKind.SERVER + and "headers" in message + ): + custom_response_attributes = ( + collect_custom_headers_attributes( + message, + self.http_capture_headers_sanitize_fields, + self.http_capture_headers_server_response, + normalise_response_header_name, + ) + if self.http_capture_headers_server_response + else {} + ) + if len(custom_response_attributes) > 0: + server_span.set_attributes(custom_response_attributes) + + if status_code: + set_status_code( + server_span, + status_code, + duration_attrs, + self._sem_conv_opt_in_mode, + ) + def _get_otel_send( self, server_span, @@ -834,74 +904,46 @@ def _get_otel_send( @wraps(send) async def otel_send(message: dict[str, Any]): nonlocal expecting_trailers - with self.tracer.start_as_current_span( - " ".join((server_span_name, scope["type"], "send")) - ) as send_span: - if callable(self.client_response_hook): - self.client_response_hook(send_span, scope, message) - status_code = None - if message["type"] == "http.response.start": - status_code = message["status"] - elif message["type"] == "websocket.send": - status_code = 200 - - if send_span.is_recording(): - if message["type"] == "http.response.start": - expecting_trailers = message.get("trailers", False) - send_span.set_attribute("asgi.event.type", message["type"]) - if ( - server_span.is_recording() - and server_span.kind == trace.SpanKind.SERVER - and "headers" in message - ): - custom_response_attributes = ( - collect_custom_headers_attributes( - message, - self.http_capture_headers_sanitize_fields, - self.http_capture_headers_server_response, - normalise_response_header_name, - ) - if self.http_capture_headers_server_response - else {} - ) - if len(custom_response_attributes) > 0: - server_span.set_attributes( - custom_response_attributes - ) - if status_code: - # We record metrics only once - set_status_code( - server_span, - status_code, - duration_attrs, - self._sem_conv_opt_in_mode, - ) - set_status_code( - send_span, - status_code, - None, - self._sem_conv_opt_in_mode, - ) + status_code = None + if message["type"] == "http.response.start": + status_code = message["status"] + elif message["type"] == "websocket.send": + status_code = 200 - propagator = get_global_response_propagator() - if propagator: - propagator.inject( - message, - context=set_span_in_context( - server_span, trace.context_api.Context() - ), - setter=asgi_setter, - ) + if not self.exclude_send_span: + expecting_trailers = self._set_send_span( + server_span_name, + scope, + send, + message, + status_code, + expecting_trailers, + ) - content_length = asgi_getter.get(message, "content-length") - if content_length: - try: - self.content_length_header = int(content_length[0]) - except ValueError: - pass + self._set_server_span( + server_span, message, status_code, duration_attrs + ) + + propagator = get_global_response_propagator() + if propagator: + propagator.inject( + message, + context=set_span_in_context( + server_span, trace.context_api.Context() + ), + setter=asgi_setter, + ) + + content_length = asgi_getter.get(message, "content-length") + if content_length: + try: + self.content_length_header = int(content_length[0]) + except ValueError: + pass + + await send(message) - await send(message) # pylint: disable=too-many-boolean-expressions if ( not expecting_trailers diff --git a/instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_middleware.py b/instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_middleware.py index a21dd626c8..a9d7897ea6 100644 --- a/instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_middleware.py +++ b/instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_middleware.py @@ -566,6 +566,30 @@ async def test_background_execution(self): _SIMULATED_BACKGROUND_TASK_EXECUTION_TIME_S * 10**9, ) + async def test_exclude_internal_spans(self): + """Test that internal spans are excluded from the emitted spans when + the `exclude_receive_span` or `exclude_send_span` attributes are set. + """ + cases = [ + (["receive", "send"], ["GET / http receive", "GET / http send"]), + (["send"], ["GET / http send"]), + (["receive"], ["GET / http receive"]), + ([], []), + ] + for exclude_spans, excluded_spans in cases: + self.memory_exporter.clear() + app = otel_asgi.OpenTelemetryMiddleware( + simple_asgi, exclude_spans=exclude_spans + ) + self.seed_app(app) + await self.send_default_request() + await self.get_all_output() + span_list = self.memory_exporter.get_finished_spans() + self.assertTrue(span_list) + for span in span_list: + for excluded_span in excluded_spans: + self.assertNotEqual(span.name, excluded_span) + async def test_trailers(self): """Test that trailers are emitted as expected and that the server span is ended BEFORE the background task is finished.""" diff --git a/instrumentation/opentelemetry-instrumentation-fastapi/src/opentelemetry/instrumentation/fastapi/__init__.py b/instrumentation/opentelemetry-instrumentation-fastapi/src/opentelemetry/instrumentation/fastapi/__init__.py index 37a293764e..7e4d0aac07 100644 --- a/instrumentation/opentelemetry-instrumentation-fastapi/src/opentelemetry/instrumentation/fastapi/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-fastapi/src/opentelemetry/instrumentation/fastapi/__init__.py @@ -179,7 +179,7 @@ def client_response_hook(span: Span, scope: dict[str, Any], message: dict[str, A from __future__ import annotations import logging -from typing import Collection +from typing import Collection, Literal import fastapi from starlette.routing import Match @@ -222,7 +222,7 @@ class FastAPIInstrumentor(BaseInstrumentor): @staticmethod def instrument_app( - app: fastapi.FastAPI, + app, server_request_hook: ServerRequestHook = None, client_request_hook: ClientRequestHook = None, client_response_hook: ClientResponseHook = None, @@ -232,8 +232,28 @@ def instrument_app( http_capture_headers_server_request: list[str] | None = None, http_capture_headers_server_response: list[str] | None = None, http_capture_headers_sanitize_fields: list[str] | None = None, + exclude_spans: list[Literal["receive", "send"]] | None = None, ): - """Instrument an uninstrumented FastAPI application.""" + """Instrument an uninstrumented FastAPI application. + + Args: + app: The fastapi ASGI application callable to forward requests to. + server_request_hook: Optional callback which is called with the server span and ASGI + scope object for every incoming request. + client_request_hook: Optional callback which is called with the internal span, and ASGI + scope and event which are sent as dictionaries for when the method receive is called. + client_response_hook: Optional callback which is called with the internal span, and ASGI + scope and event which are sent as dictionaries for when the method send is called. + tracer_provider: The optional tracer provider to use. If omitted + the current globally configured one is used. + meter_provider: The optional meter provider to use. If omitted + the current globally configured one is used. + excluded_urls: Optional comma delimited string of regexes to match URLs that should not be traced. + http_capture_headers_server_request: Optional list of HTTP headers to capture from the request. + http_capture_headers_server_response: Optional list of HTTP headers to capture from the response. + http_capture_headers_sanitize_fields: Optional list of HTTP headers to sanitize. + exclude_spans: Optionally exclude HTTP `send` and/or `receive` spans from the trace. + """ if not hasattr(app, "_is_instrumented_by_opentelemetry"): app._is_instrumented_by_opentelemetry = False @@ -273,6 +293,7 @@ def instrument_app( http_capture_headers_server_request=http_capture_headers_server_request, http_capture_headers_server_response=http_capture_headers_server_response, http_capture_headers_sanitize_fields=http_capture_headers_sanitize_fields, + exclude_spans=exclude_spans, ) app._is_instrumented_by_opentelemetry = True if app not in _InstrumentedFastAPI._instrumented_fastapi_apps: @@ -323,6 +344,7 @@ def _instrument(self, **kwargs): else parse_excluded_urls(_excluded_urls) ) _InstrumentedFastAPI._meter_provider = kwargs.get("meter_provider") + _InstrumentedFastAPI._exclude_spans = kwargs.get("exclude_spans") fastapi.FastAPI = _InstrumentedFastAPI def _uninstrument(self, **kwargs): @@ -373,6 +395,7 @@ def __init__(self, *args, **kwargs): http_capture_headers_server_request=_InstrumentedFastAPI._http_capture_headers_server_request, http_capture_headers_server_response=_InstrumentedFastAPI._http_capture_headers_server_response, http_capture_headers_sanitize_fields=_InstrumentedFastAPI._http_capture_headers_sanitize_fields, + exclude_spans=_InstrumentedFastAPI._exclude_spans, ) self._is_instrumented_by_opentelemetry = True _InstrumentedFastAPI._instrumented_fastapi_apps.add(self)