diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py index 2d556c8fa5..627003aea5 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py @@ -81,6 +81,7 @@ async def async_consume_hook(span, record, args, kwargs): from opentelemetry.instrumentation.aiokafka.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.semconv.schemas import Schemas class AIOKafkaInstrumentor(BaseInstrumentor): @@ -108,7 +109,7 @@ def _instrument(self, **kwargs): __name__, __version__, tracer_provider=tracer_provider, - schema_url="https://opentelemetry.io/schemas/1.11.0", + schema_url=Schemas.V1_27_0.value, ) wrap_function_wrapper( diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py index ebf78944b3..7e5d989eb7 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py @@ -8,7 +8,8 @@ from opentelemetry import context, propagate, trace from opentelemetry.context import Context from opentelemetry.propagators import textmap -from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.semconv._incubating.attributes import messaging_attributes +from opentelemetry.semconv.attributes import server_attributes from opentelemetry.trace import Tracer from opentelemetry.trace.span import Span @@ -21,6 +22,16 @@ def _extract_bootstrap_servers( return client._bootstrap_servers +def _extract_client_id(client: aiokafka.AIOKafkaClient) -> str: + return client._client_id + + +def _extract_consumer_group( + consumer: aiokafka.AIOKafkaConsumer, +) -> Optional[str]: + return consumer._group_id + + def _extract_argument( key: str, position: int, @@ -128,23 +139,108 @@ def set( _aiokafka_setter = AIOKafkaContextSetter() -def _enrich_span( +def _enrich_base_span( + span: Span, + *, + bootstrap_servers: Union[str, List[str]], + client_id: str, + topic: str, +) -> None: + span.set_attribute( + messaging_attributes.MESSAGING_SYSTEM, + messaging_attributes.MessagingSystemValues.KAFKA.value, + ) + span.set_attribute( + server_attributes.SERVER_ADDRESS, json.dumps(bootstrap_servers) + ) + span.set_attribute(messaging_attributes.MESSAGING_CLIENT_ID, client_id) + span.set_attribute(messaging_attributes.MESSAGING_DESTINATION_NAME, topic) + + +def _enrich_send_span( span: Span, + *, bootstrap_servers: Union[str, List[str]], + client_id: str, topic: str, partition: Optional[int], + key: Optional[str], ) -> None: if not span.is_recording(): return - span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka") - span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, topic) + _enrich_base_span( + span, + bootstrap_servers=bootstrap_servers, + client_id=client_id, + topic=topic, + ) if partition is not None: - span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition) + span.set_attribute( + messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID, + str(partition), + ) + span.set_attribute(messaging_attributes.MESSAGING_OPERATION_NAME, "send") span.set_attribute( - SpanAttributes.MESSAGING_URL, json.dumps(bootstrap_servers) + messaging_attributes.MESSAGING_OPERATION_TYPE, + messaging_attributes.MessagingOperationTypeValues.PUBLISH.value, + ) + + if key is not None: + span.set_attribute( + messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY, str(key) + ) + + +def _enrich_anext_span( + span: Span, + *, + bootstrap_servers: Union[str, List[str]], + client_id: str, + consumer_group: Optional[str], + topic: str, + partition: Optional[int], + key: Optional[str], + offset: int, +) -> None: + if not span.is_recording(): + return + + _enrich_base_span( + span, + bootstrap_servers=bootstrap_servers, + client_id=client_id, + topic=topic, + ) + + if consumer_group is not None: + span.set_attribute( + messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME, consumer_group + ) + + if partition is not None: + span.set_attribute( + messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID, + str(partition), + ) + + span.set_attribute( + messaging_attributes.MESSAGING_OPERATION_NAME, "receive" + ) + span.set_attribute( + messaging_attributes.MESSAGING_OPERATION_TYPE, + messaging_attributes.MessagingOperationTypeValues.RECEIVE.value, + ) + + if key is not None: + span.set_attribute( + messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY, key + ) + + span.set_attribute( + messaging_attributes.MESSAGING_KAFKA_MESSAGE_OFFSET, offset ) @@ -168,12 +264,21 @@ async def _traced_send( topic = _extract_send_topic(args, kwargs) bootstrap_servers = _extract_bootstrap_servers(instance.client) + client_id = _extract_client_id(instance.client) + key = _extract_send_key(args, kwargs) partition = await _extract_send_partition(instance, args, kwargs) span_name = _get_span_name("send", topic) with tracer.start_as_current_span( span_name, kind=trace.SpanKind.PRODUCER ) as span: - _enrich_span(span, bootstrap_servers, topic, partition) + _enrich_send_span( + span, + bootstrap_servers=bootstrap_servers, + client_id=client_id, + topic=topic, + partition=partition, + key=key, + ) propagate.inject( headers, context=trace.set_span_in_context(span), @@ -196,6 +301,8 @@ async def _create_consumer_span( record: ConsumerRecord, extracted_context: Context, bootstrap_servers: Union[str, List[str]], + client_id: str, + consumer_group: Optional[str], args: Tuple[Any], kwargs: Dict[str, Any], ): @@ -207,7 +314,16 @@ async def _create_consumer_span( ) as span: new_context = trace.set_span_in_context(span, extracted_context) token = context.attach(new_context) - _enrich_span(span, bootstrap_servers, record.topic, record.partition) + _enrich_anext_span( + span, + bootstrap_servers=bootstrap_servers, + client_id=client_id, + consumer_group=consumer_group, + topic=record.topic, + partition=record.partition, + key=record.key, + offset=record.offset, + ) try: if callable(async_consume_hook): await async_consume_hook(span, record, args, kwargs) @@ -229,6 +345,8 @@ async def _traced_next( if record: bootstrap_servers = _extract_bootstrap_servers(instance._client) + client_id = _extract_client_id(instance._client) + consumer_group = _extract_consumer_group(instance) extracted_context = propagate.extract( record.headers, getter=_aiokafka_getter @@ -239,6 +357,8 @@ async def _traced_next( record, extracted_context, bootstrap_servers, + client_id, + consumer_group, args, kwargs, ) diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py index edf81e619c..b1b2792608 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_utils.py @@ -59,7 +59,9 @@ def test_context_getter(self) -> None: @mock.patch( "opentelemetry.instrumentation.aiokafka.utils._extract_send_partition" ) - @mock.patch("opentelemetry.instrumentation.aiokafka.utils._enrich_span") + @mock.patch( + "opentelemetry.instrumentation.aiokafka.utils._enrich_send_span" + ) @mock.patch("opentelemetry.trace.set_span_in_context") @mock.patch("opentelemetry.propagate.inject") async def test_wrap_send_with_topic_as_arg( @@ -84,7 +86,9 @@ async def test_wrap_send_with_topic_as_arg( @mock.patch( "opentelemetry.instrumentation.aiokafka.utils._extract_send_partition" ) - @mock.patch("opentelemetry.instrumentation.aiokafka.utils._enrich_span") + @mock.patch( + "opentelemetry.instrumentation.aiokafka.utils._enrich_send_span" + ) @mock.patch("opentelemetry.trace.set_span_in_context") @mock.patch("opentelemetry.propagate.inject") async def test_wrap_send_with_topic_as_kwarg( @@ -137,9 +141,11 @@ async def wrap_send_helper( span = tracer.start_as_current_span().__enter__.return_value enrich_span.assert_called_once_with( span, - extract_bootstrap_servers.return_value, - self.topic_name, - extract_send_partition.return_value, + bootstrap_servers=extract_bootstrap_servers.return_value, + client_id=kafka_producer.client._client_id, + topic=self.topic_name, + partition=extract_send_partition.return_value, + key=None, ) set_span_in_context.assert_called_once_with(span) @@ -162,8 +168,16 @@ async def wrap_send_helper( @mock.patch( "opentelemetry.instrumentation.aiokafka.utils._extract_bootstrap_servers" ) + @mock.patch( + "opentelemetry.instrumentation.aiokafka.utils._extract_client_id" + ) + @mock.patch( + "opentelemetry.instrumentation.aiokafka.utils._extract_consumer_group" + ) async def test_wrap_next( self, + extract_consumer_group: mock.MagicMock, + extract_client_id: mock.MagicMock, extract_bootstrap_servers: mock.MagicMock, _create_consumer_span: mock.MagicMock, extract: mock.MagicMock, @@ -183,6 +197,12 @@ async def test_wrap_next( ) bootstrap_servers = extract_bootstrap_servers.return_value + extract_client_id.assert_called_once_with(kafka_consumer._client) + client_id = extract_client_id.return_value + + extract_consumer_group.assert_called_once_with(kafka_consumer) + consumer_group = extract_consumer_group.return_value + original_next_callback.assert_awaited_once_with( *self.args, **self.kwargs ) @@ -199,13 +219,17 @@ async def test_wrap_next( record, context, bootstrap_servers, + client_id, + consumer_group, self.args, self.kwargs, ) @mock.patch("opentelemetry.trace.set_span_in_context") @mock.patch("opentelemetry.context.attach") - @mock.patch("opentelemetry.instrumentation.aiokafka.utils._enrich_span") + @mock.patch( + "opentelemetry.instrumentation.aiokafka.utils._enrich_anext_span" + ) @mock.patch("opentelemetry.context.detach") async def test_create_consumer_span( self, @@ -219,6 +243,8 @@ async def test_create_consumer_span( bootstrap_servers = mock.MagicMock() extracted_context = mock.MagicMock() record = mock.MagicMock() + client_id = mock.MagicMock() + consumer_group = mock.MagicMock() await _create_consumer_span( tracer, @@ -226,6 +252,8 @@ async def test_create_consumer_span( record, extracted_context, bootstrap_servers, + client_id, + consumer_group, self.args, self.kwargs, ) @@ -242,7 +270,14 @@ async def test_create_consumer_span( attach.assert_called_once_with(set_span_in_context.return_value) enrich_span.assert_called_once_with( - span, bootstrap_servers, record.topic, record.partition + span, + bootstrap_servers=bootstrap_servers, + client_id=client_id, + consumer_group=consumer_group, + topic=record.topic, + partition=record.partition, + key=record.key, + offset=record.offset, ) consume_hook.assert_awaited_once_with( span, record, self.args, self.kwargs