From 0d6c79bbe374e780ec07663c0d55613e8db81d18 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 19 Dec 2024 23:41:04 +0000 Subject: [PATCH] fix(celery): stop closing prerun_span too soon to account for Celery chains scenario [backport 2.18] (#11806) Backport e8aab659df2df0769586856bdf9f3eaefcfbbb5b from #11498 to 2.18. We've made a few changes to handle celery context recently, including: https://github.com/DataDog/dd-trace-py/pull/10676 In particular the goal of https://github.com/DataDog/dd-trace-py/pull/10676 was to handle a scenario where a long running task may run into an exception, preventing it from closing. Unfortunately, this scenario did not account for cases where tasks are chained and may not close until later. See: https://github.com/DataDog/dd-trace-py/issues/11479 and https://github.com/DataDog/dd-trace-py/issues/11624 With this PR, the sample app in https://github.com/DataDog/dd-trace-py/issues/11479 would attach the celery specific span back to the root span. I also need to add tests for the chains scenario. Related to AIDM-494 ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) Co-authored-by: wantsui --- ddtrace/contrib/internal/celery/app.py | 11 ---- ddtrace/contrib/internal/celery/signals.py | 3 - ...-celery-closed-spans-34ff43868c1e33b8.yaml | 4 ++ tests/contrib/celery/run_tasks.py | 5 ++ tests/contrib/celery/tasks.py | 14 +++++ tests/contrib/celery/test_chained_task.py | 62 +++++++++++++++++++ 6 files changed, 85 insertions(+), 14 deletions(-) create mode 100644 releasenotes/notes/fix-celery-closed-spans-34ff43868c1e33b8.yaml create mode 100644 tests/contrib/celery/run_tasks.py create mode 100644 tests/contrib/celery/tasks.py create mode 100644 tests/contrib/celery/test_chained_task.py diff --git a/ddtrace/contrib/internal/celery/app.py b/ddtrace/contrib/internal/celery/app.py index b61585097a7..42eed2cb468 100644 --- a/ddtrace/contrib/internal/celery/app.py +++ b/ddtrace/contrib/internal/celery/app.py @@ -133,10 +133,6 @@ def _traced_apply_async_inner(func, instance, args, kwargs): if task_span: task_span.set_exc_info(*sys.exc_info()) - prerun_span = core.get_item("prerun_span") - if prerun_span: - prerun_span.set_exc_info(*sys.exc_info()) - raise finally: task_span = core.get_item("task_span") @@ -147,11 +143,4 @@ def _traced_apply_async_inner(func, instance, args, kwargs): ) task_span.finish() - prerun_span = core.get_item("prerun_span") - if prerun_span: - log.debug( - "The task_postrun signal was not called, so manually closing span: %s", prerun_span._pprint() - ) - prerun_span.finish() - return _traced_apply_async_inner diff --git a/ddtrace/contrib/internal/celery/signals.py b/ddtrace/contrib/internal/celery/signals.py index 76f07ee7524..8f27fcc53b0 100644 --- a/ddtrace/contrib/internal/celery/signals.py +++ b/ddtrace/contrib/internal/celery/signals.py @@ -54,9 +54,6 @@ def trace_prerun(*args, **kwargs): service = config.celery["worker_service_name"] span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=service, resource=task.name, span_type=SpanTypes.WORKER) - # Store an item called "prerun span" in case task_postrun doesn't get called - core.set_item("prerun_span", span) - # set span.kind to the type of request being performed span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER) diff --git a/releasenotes/notes/fix-celery-closed-spans-34ff43868c1e33b8.yaml b/releasenotes/notes/fix-celery-closed-spans-34ff43868c1e33b8.yaml new file mode 100644 index 00000000000..f16f7b36fed --- /dev/null +++ b/releasenotes/notes/fix-celery-closed-spans-34ff43868c1e33b8.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + tracing(celery): Fixes an issue where ``celery.apply`` spans from Celery prerun got closed too soon leading to span tags being missing. \ No newline at end of file diff --git a/tests/contrib/celery/run_tasks.py b/tests/contrib/celery/run_tasks.py new file mode 100644 index 00000000000..e91454ab5bb --- /dev/null +++ b/tests/contrib/celery/run_tasks.py @@ -0,0 +1,5 @@ +from tasks import fn_a +from tasks import fn_b + + +(fn_a.si() | fn_b.si()).delay() diff --git a/tests/contrib/celery/tasks.py b/tests/contrib/celery/tasks.py new file mode 100644 index 00000000000..a9dfc936ae4 --- /dev/null +++ b/tests/contrib/celery/tasks.py @@ -0,0 +1,14 @@ +from celery import Celery + + +app = Celery("tasks") + + +@app.task(name="tests.contrib.celery.tasks.fn_a") +def fn_a(): + return "a" + + +@app.task(name="tests.contrib.celery.tasks.fn_b") +def fn_b(): + return "b" diff --git a/tests/contrib/celery/test_chained_task.py b/tests/contrib/celery/test_chained_task.py new file mode 100644 index 00000000000..5fd0c543e72 --- /dev/null +++ b/tests/contrib/celery/test_chained_task.py @@ -0,0 +1,62 @@ +import os +import re +import subprocess +import time + +from celery import Celery + + +# Ensure that when we call Celery chains, the root span has celery specific span tags +# The test_integration.py setup doesn't perfectly mimic the condition of a worker process running. +# This test runs the worker as a side so we can check the tracer logs afterwards to ensure expected span results. +# See https://github.com/DataDog/dd-trace-py/issues/11479 +def test_task_chain_task_call_task(): + app = Celery("tasks") + + celery_worker_cmd = "ddtrace-run celery -A tasks worker -c 1 -l DEBUG -n uniquename1 -P solo" + celery_task_runner_cmd = "ddtrace-run python run_tasks.py" + + # The commands need to run from the directory where this test file lives + current_directory = str(os.path.dirname(__file__)) + + worker_process = subprocess.Popen( + celery_worker_cmd.split(), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + preexec_fn=os.setsid, + close_fds=True, + cwd=current_directory, + ) + + max_wait_time = 10 + waited_so_far = 0 + # {app.control.inspect().active() returns {'celery@uniquename1': []} when the worker is running} + while app.control.inspect().active() is None and waited_so_far < max_wait_time: + time.sleep(1) + waited_so_far += 1 + + # The task should only run after the Celery worker has sufficient time to start up + task_runner_process = subprocess.Popen( + celery_task_runner_cmd.split(), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + preexec_fn=os.setsid, + close_fds=True, + cwd=current_directory, + ) + + task_runner_process.wait() + # Kill the process so it starts to send traces to the Trace Agent + worker_process.kill() + worker_logs = worker_process.stderr.read() + + # Check that the root span was created with one of the Celery specific tags, such as celery.correlation_id + # Some versions of python seem to require escaping when using `re.search`: + old_pattern_match = r"resource=\\'tests.contrib.celery.tasks.fn_a\\' type=\\'worker\\' .* tags=.*correlation_id.*" + new_pattern_match = r"resource=\'tests.contrib.celery.tasks.fn_a\' type=\'worker\' .* tags=.*correlation_id.*" + + pattern_exists = ( + re.search(old_pattern_match, str(worker_logs)) is not None + or re.search(new_pattern_match, str(worker_logs)) is not None + ) + assert pattern_exists is not None