diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/_async/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/_async/__init__.py new file mode 100644 index 00000000000..d5a1781b6bf --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/_async/__init__.py @@ -0,0 +1,19 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from opentelemetry.sdk.trace.export._async.async_batch_span_processor import ( + AsyncBatchSpanProcessor, +) + +__all__ = ["AsyncBatchSpanProcessor"] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/_async/async_batch_span_processor.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/_async/async_batch_span_processor.py new file mode 100644 index 00000000000..bcdb6269f0e --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/_async/async_batch_span_processor.py @@ -0,0 +1,213 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging +from typing import List, Optional +from logging import getLogger + +from opentelemetry import context as context_api +from opentelemetry.sdk.trace import ReadableSpan, Span +from opentelemetry.sdk.trace.export import ( + BatchSpanProcessor, + SpanExporter, + SpanProcessor, +) +from opentelemetry.sdk.trace.export._async.loop import get_otel_event_loop + +_logger = getLogger(__name__) + + +class _BlockableEvent(asyncio.Event): + """Equivalent to `asyncio.Event` but provides `set_and_wait` which blocks until the event + is cleared""" + + def __init__(self) -> None: + super().__init__() + self._clear_event = asyncio.Event() + + def clear(self) -> None: + super().clear() + self._clear_event.set() + + async def set_and_wait(self) -> None: + """Similar to set() but blocks until the event to cleared (probably by the waiter)""" + self.set() + await self._clear_event.wait() + self._clear_event.clear() + + +async def _wait_event(e: asyncio.Event, timeout: float) -> bool: + """Similar to `Event.wait` but returns a bool instead of throwing + + True indicates the event was set, False that is was not set within timeout + """ + try: + await asyncio.wait_for(e.wait(), timeout) + return True + except TimeoutError: + return False + + +class _Worker: + def __init__( + self, + *, + span_exporter: SpanExporter, + max_queue_size: int, + schedule_delay_millis: float, + max_export_batch_size: int, + export_timeout_millis: float, + ) -> None: + self._span_exporter = span_exporter + self._schedule_delay_millis = schedule_delay_millis + self._max_export_batch_size = max_export_batch_size + self._max_queue_size = max_queue_size + self._export_timeout_millis = export_timeout_millis + self._span_exporter = span_exporter + + self._lqueue: List[ReadableSpan] = [] + self._flush_event = _BlockableEvent() + self._worker_task: asyncio.Task[None] + + async def start(self) -> None: + self._worker_task = asyncio.create_task(self._worker()) + + async def _worker(self) -> None: + while True: + should_flush = await _wait_event( + self._flush_event, self._schedule_delay_millis / 1000 + ) + if should_flush: + await self._drain_queue() + self._flush_event.clear() + continue + + await self._export_batch() + + async def _export_batch(self) -> None: + spans = tuple(self._lqueue[: self._max_export_batch_size]) + self._lqueue = self._lqueue[self._max_export_batch_size :] + + _logger.info("Doing export of %s spans!", len(spans)) + # unfortunately any use of ThreadPoolExecutors doesn't work in atexit hooks. See + # https://github.com/python/cpython/issues/86813. If it fails with RuntimeError, + # try again blocking the event loop + try: + await asyncio.to_thread(self._span_exporter.export, spans) + except RuntimeError: + logging.warn( + "ThreadPoolExecutor was already shutdown, exporting synchronously in event loop thread. " + "This might be running in an atexit hook." + ) + self._span_exporter.export(spans) + + async def _drain_queue(self) -> None: + # do in while loop as more spans could be enqueued while awaiting export + _logger.info( + "_drain_queue() export of %s spans!", len(self._lqueue) + ) + while self._lqueue: + await self._export_batch() + + async def enqueue(self, span: ReadableSpan) -> None: + if len(self._lqueue) > self._max_queue_size: + # drop the span + _logger.info("Queue is full, dropping span!") + return + + self._lqueue.append(span) + + async def shutdown(self) -> None: + self._worker_task.cancel() + try: + await self._worker_task + except asyncio.CancelledError: + _logger.info("worker task cancelled") + pass + _logger.info("Draining queue") + await self._drain_queue() + + async def force_flush(self) -> None: + await self._flush_event.set_and_wait() + + +class AsyncBatchSpanProcessor(SpanProcessor): + def __init__( + self, + span_exporter: SpanExporter, + *, + max_queue_size: Optional[int] = None, + schedule_delay_millis: Optional[float] = None, + max_export_batch_size: Optional[int] = None, + export_timeout_millis: Optional[float] = None, + ): + if max_queue_size is None: + max_queue_size = BatchSpanProcessor._default_max_queue_size() + + if schedule_delay_millis is None: + schedule_delay_millis = ( + BatchSpanProcessor._default_schedule_delay_millis() + ) + + if max_export_batch_size is None: + max_export_batch_size = ( + BatchSpanProcessor._default_max_export_batch_size() + ) + + if export_timeout_millis is None: + export_timeout_millis = ( + BatchSpanProcessor._default_export_timeout_millis() + ) + + BatchSpanProcessor._validate_arguments( + max_queue_size, schedule_delay_millis, max_export_batch_size + ) + + self._worker = _Worker( + span_exporter=span_exporter, + max_queue_size=max_queue_size, + schedule_delay_millis=schedule_delay_millis, + max_export_batch_size=max_export_batch_size, + export_timeout_millis=export_timeout_millis, + ) + asyncio.run_coroutine_threadsafe( + self._worker.start(), get_otel_event_loop() + ).result() + + def on_start( + self, + span: Span, + parent_context: Optional[context_api.Context] = None, + ) -> None: + pass + + def on_end(self, span: "ReadableSpan") -> None: + asyncio.run_coroutine_threadsafe( + self._worker.enqueue(span), get_otel_event_loop() + ) + + def shutdown(self) -> None: + asyncio.run_coroutine_threadsafe( + self._worker.shutdown(), get_otel_event_loop() + ).result() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + try: + asyncio.run_coroutine_threadsafe( + self._worker.force_flush(), get_otel_event_loop() + ).result(timeout=timeout_millis / 1000) + return True + except TimeoutError: + return False diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/_async/loop.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/_async/loop.py new file mode 100644 index 00000000000..ffeb4683d97 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/_async/loop.py @@ -0,0 +1,70 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +from logging import getLogger +from typing import Iterable, Optional, List +from opentelemetry.util._once import Once +from threading import current_thread, Thread + +_logger = getLogger(__name__) + +_event_loop: Optional[asyncio.AbstractEventLoop] = None +_create_loop_once = Once() + + +def _create() -> None: + loop = asyncio.new_event_loop() + global _event_loop + _event_loop = loop + + def thread() -> None: + try: + _logger.info("Starting loop in thread %s", current_thread()) + loop.run_forever() + finally: + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.close() + + Thread( + name="OTelSharedAsyncioLoopThread", target=thread, daemon=True + ).start() + + +def get_otel_event_loop() -> asyncio.AbstractEventLoop: + # TODO: consider use the existing event loop (asyncio.get_running_loop()) if it is + # available. This would only work if we add async variants to providers' + # shutdown/force_flush methods. Otherwise, it will just deadlock the event loop thread. + + _create_loop_once.do_once(_create) + assert _event_loop is not None + return _event_loop + + +# not current used but maybe needed for redesign +async def next_event(events: Iterable[asyncio.Event]) -> asyncio.Event: + async def wait(event: asyncio.Event) -> asyncio.Event: + await event.wait() + return event + + wait_tasks: List[asyncio.Task] = [] + try: + wait_tasks = [asyncio.create_task(wait(event)) for event in events] + done, _ = await asyncio.wait( + wait_tasks, return_when=asyncio.FIRST_COMPLETED + ) + return done.pop().result() + finally: + for t in wait_tasks: + t.cancel() diff --git a/script.py b/script.py new file mode 100644 index 00000000000..ddd4ae1782f --- /dev/null +++ b/script.py @@ -0,0 +1,46 @@ +import logging +import time + +from opentelemetry.sdk.trace.export import ConsoleSpanExporter +from opentelemetry.sdk.trace.export._async import ( + AsyncBatchSpanProcessor, +) +from opentelemetry.sdk.trace import TracerProvider + +logging.basicConfig(level=logging.DEBUG) + +tp = TracerProvider() +tp.add_span_processor( + AsyncBatchSpanProcessor( + ConsoleSpanExporter(out=open("./spans.log", "w+")), + schedule_delay_millis=1000, + ) +) +t = tp.get_tracer(__name__) + + +def main() -> None: + # write spans and allow them to export + for _ in range(10): + with t.start_as_current_span("foo"): + pass + + time.sleep(2.5) + + # write spans and force flush them immediately + for _ in range(10): + with t.start_as_current_span("flushme"): + pass + + tp.force_flush() + time.sleep(0.5) + + # write spans and allow shutdown to flush them + for _ in range(10): + with t.start_as_current_span("shutmedown"): + pass + + +main() + +tp.shutdown()