Skip to content

Commit

Permalink
Bugfix/check future cancelled (#2461)
Browse files Browse the repository at this point in the history
* Calling the exception() method when future is in the cancelled state is causing a CancelledError

Calling the exception() method when future is in the cancelled state is causing a CancelledError. we should check the cancelled state first and call f.exception() only if it's not cancelled.

* modify lint

* modify lint

* Update CHANGELOG.md

* remove init()

* add future cancelled test code

* add future cancelled test code

* add future cancelled test code

* add future cancelled test code

* add future cancelled test code

* add future cancelled test code

* lint

* lint

* remove if condition

* modify test code

* lint

* lint

* remove pytest

---------

Co-authored-by: Diego Hurtado <[email protected]>
  • Loading branch information
bourbonkk and ocelotl authored May 7, 2024
1 parent 0a231e5 commit bc804a3
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 15 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2418](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2418))
- Use sqlalchemy version in sqlalchemy commenter instead of opentelemetry library version
([#2404](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2404))
- `opentelemetry-instrumentation-asyncio` Check for cancelledException in the future
([#2461](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2461))
- Remove SDK dependency from opentelemetry-instrumentation-grpc
([#2474](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2474))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,11 @@ class AsyncioInstrumentor(BaseInstrumentor):
"run_coroutine_threadsafe",
]

def __init__(self):
super().__init__()
self.process_duration_histogram = None
self.process_created_counter = None

self._tracer = None
self._meter = None
self._coros_name_to_trace: set = set()
self._to_thread_name_to_trace: set = set()
self._future_active_enabled: bool = False

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):
# pylint: disable=attribute-defined-outside-init
self._tracer = get_tracer(
__name__, __version__, kwargs.get("tracer_provider")
)
Expand Down Expand Up @@ -307,13 +297,17 @@ def trace_future(self, future):
)

def callback(f):
exception = f.exception()
attr = {
"type": "future",
"state": (
"cancelled"
if f.cancelled()
else determine_state(f.exception())
),
}
state = determine_state(exception)
attr["state"] = state
self.record_process(start, attr, span, exception)
self.record_process(
start, attr, span, None if f.cancelled() else f.exception()
)

future.add_done_callback(callback)
return future
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import asyncio
from unittest.mock import patch

from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
from opentelemetry.instrumentation.asyncio.environment_variables import (
OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED,
)
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import get_tracer


class TestTraceFuture(TestBase):
@patch.dict(
"os.environ", {OTEL_PYTHON_ASYNCIO_FUTURE_TRACE_ENABLED: "true"}
)
def setUp(self):
super().setUp()
self._tracer = get_tracer(
__name__,
)
self.instrumentor = AsyncioInstrumentor()
self.instrumentor.instrument()

def tearDown(self):
super().tearDown()
self.instrumentor.uninstrument()

def test_trace_future_cancelled(self):
async def future_cancelled():
with self._tracer.start_as_current_span("root"):
future = asyncio.Future()
future = self.instrumentor.trace_future(future)
future.cancel()

try:
asyncio.run(future_cancelled())
except asyncio.CancelledError as exc:
self.assertEqual(isinstance(exc, asyncio.CancelledError), True)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
self.assertEqual(spans[0].name, "root")
self.assertEqual(spans[1].name, "asyncio future")

metrics = (
self.memory_metrics_reader.get_metrics_data()
.resource_metrics[0]
.scope_metrics[0]
.metrics
)
self.assertEqual(len(metrics), 2)

self.assertEqual(metrics[0].name, "asyncio.process.duration")
self.assertEqual(
metrics[0].data.data_points[0].attributes["state"], "cancelled"
)

self.assertEqual(metrics[1].name, "asyncio.process.created")
self.assertEqual(
metrics[1].data.data_points[0].attributes["state"], "cancelled"
)

0 comments on commit bc804a3

Please sign in to comment.