diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d0171ab5d..276c23410d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -117,6 +117,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3610](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3610)) - infra(ci): Fix git pull failures in core contrib test ([#3357](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3357)) +- `opentelemetry-instrumentation-celery`: Implement new messaging semantic convention opt-in in celery instrumentation and add corresponding tests ([#3712](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3712)) ### Added diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index 8a13278b7b..3904d32732 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -69,6 +69,14 @@ def add(x, y): from opentelemetry import context as context_api from opentelemetry import trace +from opentelemetry.instrumentation._semconv import ( + _get_schema_url, + _OpenTelemetrySemanticConventionStability, + _OpenTelemetryStabilitySignalType, + _report_new, + _report_old, + _StabilityMode, +) from opentelemetry.instrumentation.celery import utils from opentelemetry.instrumentation.celery.package import _instruments from opentelemetry.instrumentation.celery.version import __version__ @@ -76,6 +84,7 @@ def add(x, y): from opentelemetry.metrics import get_meter from opentelemetry.propagate import extract, inject from opentelemetry.propagators.textmap import Getter +from opentelemetry.semconv._incubating.attributes import messaging_attributes from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.trace.status import Status, StatusCode @@ -116,6 +125,7 @@ def keys(self, carrier): class CeleryInstrumentor(BaseInstrumentor): metrics = None task_id_to_start_time = {} + _sem_conv_opt_in_mode = _StabilityMode.DEFAULT def instrumentation_dependencies(self) -> Collection[str]: return _instruments @@ -123,12 +133,16 @@ def instrumentation_dependencies(self) -> Collection[str]: def _instrument(self, **kwargs): tracer_provider = kwargs.get("tracer_provider") + self._sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( + _OpenTelemetryStabilitySignalType.MESSAGING + ) + # pylint: disable=attribute-defined-outside-init self._tracer = trace.get_tracer( __name__, __version__, tracer_provider, - schema_url="https://opentelemetry.io/schemas/1.11.0", + schema_url=_get_schema_url(self._sem_conv_opt_in_mode), ) meter_provider = kwargs.get("meter_provider") @@ -136,7 +150,7 @@ def _instrument(self, **kwargs): __name__, __version__, meter_provider, - schema_url="https://opentelemetry.io/schemas/1.11.0", + schema_url=_get_schema_url(self._sem_conv_opt_in_mode), ) self.create_celery_metrics(meter) @@ -204,8 +218,12 @@ def _trace_postrun(self, *args, **kwargs): # request context tags if span.is_recording(): span.set_attribute(_TASK_TAG_KEY, _TASK_RUN) - utils.set_attributes_from_context(span, kwargs) - utils.set_attributes_from_context(span, task.request) + utils.set_attributes_from_context( + span, kwargs, self._sem_conv_opt_in_mode + ) + utils.set_attributes_from_context( + span, task.request, self._sem_conv_opt_in_mode + ) span.set_attribute(_TASK_NAME_KEY, task.name) activation.__exit__(None, None, None) @@ -240,9 +258,18 @@ def _trace_before_publish(self, *args, **kwargs): # apply some attributes here because most of the data is not available if span.is_recording(): span.set_attribute(_TASK_TAG_KEY, _TASK_APPLY_ASYNC) - span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, task_id) + if _report_new(self._sem_conv_opt_in_mode): + span.set_attribute( + messaging_attributes.MESSAGING_MESSAGE_ID, task_id + ) # Not necessary since it has the same name as the old attribute but just in case it changes in the future + if _report_old(self._sem_conv_opt_in_mode): + span.set_attribute( + SpanAttributes.MESSAGING_MESSAGE_ID, task_id + ) span.set_attribute(_TASK_NAME_KEY, task_name) - utils.set_attributes_from_context(span, kwargs) + utils.set_attributes_from_context( + span, kwargs, self._sem_conv_opt_in_mode + ) activation = trace.use_span(span, end_on_exit=True) activation.__enter__() # pylint: disable=E1101 diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py index d7ca77af8a..e377f64925 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py @@ -20,6 +20,12 @@ from celery import registry # pylint: disable=no-name-in-module from celery.app.task import Task +from opentelemetry.instrumentation._semconv import ( + _report_new, + _report_old, + _StabilityMode, +) +from opentelemetry.semconv._incubating.attributes import messaging_attributes from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.trace import Span @@ -56,7 +62,11 @@ # pylint:disable=too-many-branches -def set_attributes_from_context(span, context): +def set_attributes_from_context( + span, + context, + sem_conv_opt_in_mode: _StabilityMode = _StabilityMode.DEFAULT, +): """Helper to extract meta values from a Celery Context""" if not span.is_recording(): return @@ -80,7 +90,7 @@ def set_attributes_from_context(span, context): continue attribute_name = None - + new_attribute_name = None # Celery 4.0 uses `origin` instead of `hostname`; this change preserves # the same name for the tag despite Celery version if key == "origin": @@ -91,20 +101,39 @@ def set_attributes_from_context(span, context): routing_key = value.get("routing_key") if routing_key is not None: - span.set_attribute( - SpanAttributes.MESSAGING_DESTINATION, routing_key - ) + if _report_new(sem_conv_opt_in_mode): + span.set_attribute( + messaging_attributes.MESSAGING_DESTINATION_NAME, + routing_key, + ) + if _report_old(sem_conv_opt_in_mode): + span.set_attribute( + SpanAttributes.MESSAGING_DESTINATION, routing_key + ) value = str(value) elif key == "id": - attribute_name = SpanAttributes.MESSAGING_MESSAGE_ID + if _report_new(sem_conv_opt_in_mode): + new_attribute_name = messaging_attributes.MESSAGING_MESSAGE_ID + if _report_old(sem_conv_opt_in_mode): + attribute_name = SpanAttributes.MESSAGING_MESSAGE_ID elif key == "correlation_id": - attribute_name = SpanAttributes.MESSAGING_CONVERSATION_ID + if _report_new(sem_conv_opt_in_mode): + new_attribute_name = ( + messaging_attributes.MESSAGING_MESSAGE_CONVERSATION_ID + ) + if _report_old(sem_conv_opt_in_mode): + attribute_name = SpanAttributes.MESSAGING_CONVERSATION_ID elif key == "routing_key": - attribute_name = SpanAttributes.MESSAGING_DESTINATION + if _report_new(sem_conv_opt_in_mode): + new_attribute_name = ( + messaging_attributes.MESSAGING_DESTINATION_NAME + ) + if _report_old(sem_conv_opt_in_mode): + attribute_name = SpanAttributes.MESSAGING_DESTINATION # according to https://docs.celeryproject.org/en/stable/userguide/routing.html#exchange-types elif key == "declare": @@ -117,11 +146,13 @@ def set_attributes_from_context(span, context): value = "topic" break + if attribute_name: + span.set_attribute(attribute_name, value) + if new_attribute_name: + span.set_attribute(new_attribute_name, value) # set attribute name if not set specially for a key - if attribute_name is None: - attribute_name = f"celery.{key}" - - span.set_attribute(attribute_name, value) + if attribute_name is None and new_attribute_name is None: + span.set_attribute(f"celery.{key}", value) def attach_context( diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py index c68b1bc758..c0dd8f029c 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py @@ -14,12 +14,21 @@ import threading import time +from unittest import mock from wrapt import wrap_function_wrapper from opentelemetry import baggage, context +from opentelemetry.instrumentation._semconv import ( + OTEL_SEMCONV_STABILITY_OPT_IN, + _OpenTelemetrySemanticConventionStability, +) from opentelemetry.instrumentation.celery import CeleryInstrumentor, utils from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.semconv._incubating.attributes import ( + exception_attributes, + messaging_attributes, +) from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.test.test_base import TestBase from opentelemetry.trace import SpanKind, StatusCode @@ -30,6 +39,7 @@ class TestCeleryInstrumentation(TestBase): def setUp(self): super().setUp() + _OpenTelemetrySemanticConventionStability._initialized = False # to have consistent behavior accross tests since this attribute ensures that initialization happens only once self._worker = app.Worker(app=app, pool="solo", concurrency=1) self._thread = threading.Thread(target=self._worker.start) self._thread.daemon = True @@ -222,6 +232,211 @@ def _retrieve_context_wrapper_none_token( unwrap(utils, "retrieve_context") + def test_task_new_sem_conv(self): + with mock.patch.dict( + "os.environ", {OTEL_SEMCONV_STABILITY_OPT_IN: "messaging"} + ): + CeleryInstrumentor().instrument() + + result = task_add.delay(1, 2) + timeout = time.time() + 60 * 1 # 1 minutes from now + while not result.ready(): + if time.time() > timeout: + break + time.sleep(0.05) + + spans = self.sorted_spans( + self.memory_exporter.get_finished_spans() + ) + self.assertEqual(len(spans), 2) + + consumer, producer = spans + + self.assertEqual( + consumer.name, "run/tests.celery_test_tasks.task_add" + ) + self.assertEqual(consumer.kind, SpanKind.CONSUMER) + self.assertSpanHasAttributes( + consumer, + { + "celery.action": "run", + "celery.state": "SUCCESS", + messaging_attributes.MESSAGING_DESTINATION_NAME: "celery", + "celery.task_name": "tests.celery_test_tasks.task_add", + }, + ) + + self.assertEqual(consumer.status.status_code, StatusCode.UNSET) + + self.assertEqual(0, len(consumer.events)) + + self.assertEqual( + producer.name, "apply_async/tests.celery_test_tasks.task_add" + ) + self.assertEqual(producer.kind, SpanKind.PRODUCER) + self.assertSpanHasAttributes( + producer, + { + "celery.action": "apply_async", + "celery.task_name": "tests.celery_test_tasks.task_add", + messaging_attributes.MESSAGING_DESTINATION_NAME: "celery", + }, + ) + + self.assertNotEqual(consumer.parent, producer.context) + self.assertEqual(consumer.parent.span_id, producer.context.span_id) + self.assertEqual( + consumer.context.trace_id, producer.context.trace_id + ) + + def test_task_raises_new_sem_conv(self): + with mock.patch.dict( + "os.environ", {OTEL_SEMCONV_STABILITY_OPT_IN: "messaging"} + ): + CeleryInstrumentor().instrument() + + result = task_raises.delay() + + timeout = time.time() + 60 * 1 # 1 minutes from now + while not result.ready(): + if time.time() > timeout: + break + time.sleep(0.05) + + spans = self.sorted_spans( + self.memory_exporter.get_finished_spans() + ) + self.assertEqual(len(spans), 2) + + consumer, producer = spans + + self.assertEqual( + consumer.name, "run/tests.celery_test_tasks.task_raises" + ) + self.assertEqual(consumer.kind, SpanKind.CONSUMER) + self.assertSpanHasAttributes( + consumer, + { + "celery.action": "run", + "celery.state": "FAILURE", + messaging_attributes.MESSAGING_DESTINATION_NAME: "celery", + "celery.task_name": "tests.celery_test_tasks.task_raises", + }, + ) + + self.assertEqual(consumer.status.status_code, StatusCode.ERROR) + + self.assertEqual(1, len(consumer.events)) + event = consumer.events[0] + + self.assertIn( + exception_attributes.EXCEPTION_STACKTRACE, event.attributes + ) + + self.assertEqual( + "tests.celery_test_tasks.CustomError", + event.attributes[exception_attributes.EXCEPTION_TYPE], + ) + + self.assertEqual( + event.attributes[exception_attributes.EXCEPTION_MESSAGE], + "The task failed!", + ) + + self.assertEqual( + producer.name, + "apply_async/tests.celery_test_tasks.task_raises", + ) + self.assertEqual(producer.kind, SpanKind.PRODUCER) + self.assertSpanHasAttributes( + producer, + { + "celery.action": "apply_async", + "celery.task_name": "tests.celery_test_tasks.task_raises", + messaging_attributes.MESSAGING_DESTINATION_NAME: "celery", + }, + ) + + self.assertNotEqual(consumer.parent, producer.context) + self.assertEqual(consumer.parent.span_id, producer.context.span_id) + self.assertEqual( + consumer.context.trace_id, producer.context.trace_id + ) + + def test_task_both_sem_conv(self): + with mock.patch.dict( + "os.environ", {OTEL_SEMCONV_STABILITY_OPT_IN: "messaging/dup"} + ): + CeleryInstrumentor().instrument() + + result = task_add.delay(1, 2) + timeout = time.time() + 60 * 1 # 1 minutes from now + while not result.ready(): + if time.time() > timeout: + break + time.sleep(0.05) + + spans = self.sorted_spans( + self.memory_exporter.get_finished_spans() + ) + self.assertEqual(len(spans), 2) + + consumer, producer = spans + + self.assertEqual( + consumer.name, "run/tests.celery_test_tasks.task_add" + ) + self.assertEqual(consumer.kind, SpanKind.CONSUMER) + self.assertSpanHasAttributes( + consumer, + { + "celery.action": "run", + "celery.state": "SUCCESS", + messaging_attributes.MESSAGING_DESTINATION_NAME: "celery", + "celery.task_name": "tests.celery_test_tasks.task_add", + }, + ) + self.assertSpanHasAttributes( + consumer, + { + "celery.action": "run", + "celery.state": "SUCCESS", + SpanAttributes.MESSAGING_DESTINATION: "celery", + "celery.task_name": "tests.celery_test_tasks.task_add", + }, + ) + + self.assertEqual(consumer.status.status_code, StatusCode.UNSET) + + self.assertEqual(0, len(consumer.events)) + + self.assertEqual( + producer.name, "apply_async/tests.celery_test_tasks.task_add" + ) + self.assertEqual(producer.kind, SpanKind.PRODUCER) + self.assertSpanHasAttributes( + producer, + { + "celery.action": "apply_async", + "celery.task_name": "tests.celery_test_tasks.task_add", + messaging_attributes.MESSAGING_DESTINATION_NAME: "celery", + }, + ) + self.assertSpanHasAttributes( + consumer, + { + "celery.action": "run", + "celery.state": "SUCCESS", + SpanAttributes.MESSAGING_DESTINATION: "celery", + "celery.task_name": "tests.celery_test_tasks.task_add", + }, + ) + self.assertNotEqual(consumer.parent, producer.context) + self.assertEqual(consumer.parent.span_id, producer.context.span_id) + self.assertEqual( + consumer.context.trace_id, producer.context.trace_id + ) + class TestCelerySignatureTask(TestBase): def setUp(self): diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_utils.py index a2f6e4338c..986072f2f3 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_utils.py @@ -55,7 +55,8 @@ def test_set_attributes_from_context(self): "44b7f305", ) self.assertEqual( - span.attributes.get(SpanAttributes.MESSAGING_DESTINATION), "celery" + span.attributes.get(SpanAttributes.MESSAGING_DESTINATION), + "celery", ) self.assertEqual( diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_semconv.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_semconv.py index 1b1748e206..a15f16bedd 100644 --- a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_semconv.py +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/_semconv.py @@ -166,6 +166,7 @@ class _OpenTelemetryStabilitySignalType(Enum): HTTP = "http" DATABASE = "database" GEN_AI = "gen_ai" + MESSAGING = "messaging" class _StabilityMode(Enum): @@ -175,6 +176,8 @@ class _StabilityMode(Enum): DATABASE = "database" DATABASE_DUP = "database/dup" GEN_AI_LATEST_EXPERIMENTAL = "gen_ai_latest_experimental" + MESSAGING = "messaging" + MESSAGING_DUP = "messaging/dup" def _report_new(mode: _StabilityMode): @@ -182,7 +185,11 @@ def _report_new(mode: _StabilityMode): def _report_old(mode: _StabilityMode): - return mode not in (_StabilityMode.HTTP, _StabilityMode.DATABASE) + return mode not in ( + _StabilityMode.HTTP, + _StabilityMode.DATABASE, + _StabilityMode.MESSAGING, + ) class _OpenTelemetrySemanticConventionStability: @@ -206,6 +213,7 @@ def _initialize(cls): _OpenTelemetryStabilitySignalType.HTTP: _StabilityMode.DEFAULT, _OpenTelemetryStabilitySignalType.DATABASE: _StabilityMode.DEFAULT, _OpenTelemetryStabilitySignalType.GEN_AI: _StabilityMode.DEFAULT, + _OpenTelemetryStabilitySignalType.MESSAGING: _StabilityMode.DEFAULT, } cls._initialized = True return @@ -233,6 +241,15 @@ def _initialize(cls): _StabilityMode.DATABASE, _StabilityMode.DATABASE_DUP, ) + + cls._OTEL_SEMCONV_STABILITY_SIGNAL_MAPPING[ + _OpenTelemetryStabilitySignalType.MESSAGING + ] = cls._filter_mode( + opt_in_list, + _StabilityMode.MESSAGING, + _StabilityMode.MESSAGING_DUP, + ) + cls._initialized = True @staticmethod