diff --git a/elasticapm/base.py b/elasticapm/base.py index 6f79f2d1d..d33ba6231 100644 --- a/elasticapm/base.py +++ b/elasticapm/base.py @@ -41,6 +41,7 @@ import time import warnings from copy import deepcopy +from typing import Optional, Tuple import elasticapm from elasticapm.conf import Config, VersionedConfig, constants @@ -617,6 +618,20 @@ def check_python_version(self): elif v < (3, 5): warnings.warn("The Elastic APM agent only supports Python 3.5+", DeprecationWarning) + def check_server_version(self, gte: Optional[Tuple[int]] = None, lte: Optional[Tuple[int]] = None) -> bool: + """ + Check APM Server version against greater-or-equal and/or lower-or-equal limits, provided as tuples of integers. + If server_version is not set, always returns True. + :param gte: a tuple of ints describing the greater-or-equal limit, e.g. (7, 16) + :param lte: a tuple of ints describing the lower-or-equal limit, e.g. (7, 99) + :return: bool + """ + if not self.server_version: + return True + gte = gte or (0,) + lte = lte or (2 ** 32,) # let's assume APM Server version will never be greater than 2^32 + return bool(gte <= self.server_version <= lte) + class DummyClient(Client): """Sends messages into an empty void""" diff --git a/elasticapm/traces.py b/elasticapm/traces.py index 8cce0eee8..bf352ef3c 100644 --- a/elasticapm/traces.py +++ b/elasticapm/traces.py @@ -83,10 +83,12 @@ def duration(self): class BaseSpan(object): - def __init__(self, labels=None): + def __init__(self, labels=None, start=None): self._child_durations = ChildDuration(self) self.labels = {} self.outcome = None + self.start_time = start or _time_func() + self.duration = None if labels: self.label(**labels) @@ -97,7 +99,7 @@ def child_ended(self, timestamp): self._child_durations.stop(timestamp) def end(self, skip_frames=0, duration=None): - raise NotImplementedError() + self.duration = duration if duration is not None else (_time_func() - self.start_time) def label(self, **labels): """ @@ -131,12 +133,8 @@ def __init__( ): self.id = self.get_dist_tracing_id() self.trace_parent = trace_parent - if start: - self.timestamp = self.start_time = start - else: - self.timestamp, self.start_time = time.time(), _time_func() + self.timestamp = start if start is not None else time.time() self.name = None - self.duration = None self.result = None self.transaction_type = transaction_type self.tracer = tracer @@ -149,6 +147,7 @@ def __init__( self._span_counter = 0 self._span_timers = defaultdict(Timer) self._span_timers_lock = threading.Lock() + self._dropped_span_statistics = defaultdict(lambda: {"count": 0, "duration.sum.us": 0}) try: self._breakdown = self.tracer._agent._metrics.get_metricset( "elasticapm.metrics.sets.breakdown.BreakdownMetricSet" @@ -164,7 +163,7 @@ def __init__( super(Transaction, self).__init__() def end(self, skip_frames=0, duration=None): - self.duration = duration if duration is not None else (_time_func() - self.start_time) + super().end(skip_frames, duration) if self._transaction_metrics: self._transaction_metrics.timer( "transaction.duration", @@ -214,7 +213,7 @@ def _begin_span( span = DroppedSpan(parent_span, leaf=True) elif tracer.config.transaction_max_spans and self._span_counter > tracer.config.transaction_max_spans - 1: self.dropped_spans += 1 - span = DroppedSpan(parent_span) + span = DroppedSpan(parent_span, context=context) self._span_counter += 1 else: span = Span( @@ -318,6 +317,15 @@ def to_dict(self): "sampled": self.is_sampled, "span_count": {"started": self._span_counter - self.dropped_spans, "dropped": self.dropped_spans}, } + if self._dropped_span_statistics: + result["dropped_spans_stats"] = [ + { + "destination_service_resource": resource, + "outcome": outcome, + "duration": {"count": v["count"], "sum": {"us": int(v["duration.sum.us"] * 1000000)}}, + } + for (resource, outcome), v in self._dropped_span_statistics.items() + ] if self.sample_rate is not None: result["sample_rate"] = float(self.sample_rate) if self.trace_parent: @@ -405,7 +413,6 @@ def __init__( :param sync: indicate if the span was executed synchronously or asynchronously :param start: timestamp, mostly useful for testing """ - self.start_time = start or _time_func() self.id = self.get_dist_tracing_id() self.transaction = transaction self.name = name @@ -415,26 +422,19 @@ def __init__( # we take the (non-monotonic) transaction timestamp, and add the (monotonic) difference of span # start time and transaction start time. In this respect, the span timestamp is guaranteed to grow # monotonically with respect to the transaction timestamp - self.timestamp = transaction.timestamp + (self.start_time - transaction.start_time) self.duration = None self.parent = parent self.parent_span_id = parent_span_id self.frames = None self.sync = sync - if span_subtype is None and "." in span_type: - # old style dottet type, let's split it up - type_bits = span_type.split(".") - if len(type_bits) == 2: - span_type, span_subtype = type_bits[:2] - else: - span_type, span_subtype, span_action = type_bits[:3] self.type = span_type self.subtype = span_subtype self.action = span_action + super(Span, self).__init__(labels=labels, start=start) + self.timestamp = transaction.timestamp + (self.start_time - transaction.start_time) if self.transaction._breakdown: p = self.parent if self.parent else self.transaction p.child_started(self.start_time) - super(Span, self).__init__(labels=labels) def to_dict(self): result = { @@ -473,9 +473,8 @@ def end(self, skip_frames=0, duration=None): :param duration: override duration, mostly useful for testing :return: None """ + super().end(skip_frames, duration) tracer = self.transaction.tracer - timestamp = _time_func() - self.duration = duration if duration is not None else (timestamp - self.start_time) if not tracer.span_frames_min_duration or self.duration >= tracer.span_frames_min_duration and self.frames: self.frames = tracer.frames_processing_func(self.frames)[skip_frames:] else: @@ -505,15 +504,18 @@ def __str__(self): class DroppedSpan(BaseSpan): - __slots__ = ("leaf", "parent", "id") + __slots__ = ("leaf", "parent", "id", "context", "outcome") - def __init__(self, parent, leaf=False): + def __init__(self, parent, leaf=False, start=None, context=None): self.parent = parent self.leaf = leaf self.id = None - super(DroppedSpan, self).__init__() + self.context = context + self.outcome = constants.OUTCOME.UNKNOWN + super(DroppedSpan, self).__init__(start=start) def end(self, skip_frames=0, duration=None): + super().end(skip_frames, duration) execution_context.set_span(self.parent) def child_started(self, timestamp): @@ -537,18 +539,6 @@ def subtype(self): def action(self): return None - @property - def context(self): - return None - - @property - def outcome(self): - return "unknown" - - @outcome.setter - def outcome(self, value): - return - class Tracer(object): def __init__(self, frames_collector_func, frames_processing_func, queue_func, config, agent): @@ -664,6 +654,13 @@ def __init__( sync=None, ): self.name = name + if span_subtype is None and "." in span_type: + # old style dotted type, let's split it up + type_bits = span_type.split(".") + if len(type_bits) == 2: + span_type, span_subtype = type_bits[:2] + else: + span_type, span_subtype, span_action = type_bits[:3] self.type = span_type self.subtype = span_subtype self.action = span_action @@ -707,6 +704,17 @@ def __exit__(self, exc_type, exc_val, exc_tb): try: outcome = "failure" if exc_val else "success" span = transaction.end_span(self.skip_frames, duration=self.duration, outcome=outcome) + should_send = ( + transaction.tracer._agent.check_server_version(gte=(7, 16)) if transaction.tracer._agent else True + ) + if should_send and isinstance(span, DroppedSpan) and span.context: + try: + resource = span.context["destination"]["service"]["resource"] + stats = transaction._dropped_span_statistics[(resource, span.outcome)] + stats["count"] += 1 + stats["duration.sum.us"] += span.duration + except KeyError: + pass if exc_val and not isinstance(span, DroppedSpan): try: exc_val._elastic_apm_span_id = span.id diff --git a/tests/client/client_tests.py b/tests/client/client_tests.py index 492004b43..2fd83d795 100644 --- a/tests/client/client_tests.py +++ b/tests/client/client_tests.py @@ -543,28 +543,6 @@ def test_transaction_sample_rate_dynamic(elasticapm_client, not_so_random): assert len([t for t in transactions if t["sampled"]]) == 8 -@pytest.mark.parametrize("elasticapm_client", [{"transaction_max_spans": 5}], indirect=True) -def test_transaction_max_spans(elasticapm_client): - elasticapm_client.begin_transaction("test_type") - for i in range(5): - with elasticapm.capture_span("nodrop"): - pass - for i in range(10): - with elasticapm.capture_span("drop"): - pass - transaction_obj = elasticapm_client.end_transaction("test") - - transaction = elasticapm_client.events[TRANSACTION][0] - spans = elasticapm_client.events[SPAN] - assert all(span["transaction_id"] == transaction["id"] for span in spans) - - assert transaction_obj.dropped_spans == 10 - assert len(spans) == 5 - for span in spans: - assert span["name"] == "nodrop" - assert transaction["span_count"] == {"dropped": 10, "started": 5} - - def test_transaction_max_spans_dynamic(elasticapm_client): elasticapm_client.config.update(version=1, transaction_max_spans=1) elasticapm_client.begin_transaction("test_type") @@ -662,35 +640,6 @@ def test_transaction_span_frames_min_duration_dynamic(elasticapm_client): assert spans[3]["stacktrace"] is not None -@pytest.mark.parametrize("elasticapm_client", [{"transaction_max_spans": 3}], indirect=True) -def test_transaction_max_span_nested(elasticapm_client): - elasticapm_client.begin_transaction("test_type") - with elasticapm.capture_span("1"): - with elasticapm.capture_span("2"): - with elasticapm.capture_span("3"): - with elasticapm.capture_span("4"): - with elasticapm.capture_span("5"): - pass - with elasticapm.capture_span("6"): - pass - with elasticapm.capture_span("7"): - pass - with elasticapm.capture_span("8"): - pass - with elasticapm.capture_span("9"): - pass - transaction_obj = elasticapm_client.end_transaction("test") - - transaction = elasticapm_client.events[TRANSACTION][0] - spans = elasticapm_client.events[SPAN] - - assert transaction_obj.dropped_spans == 6 - assert len(spans) == 3 - for span in spans: - assert span["name"] in ("1", "2", "3") - assert transaction["span_count"] == {"dropped": 6, "started": 3} - - def test_transaction_keyword_truncation(elasticapm_client): too_long = "x" * (KEYWORD_MAX_LENGTH + 1) expected = encoding.keyword_field(too_long) @@ -894,3 +843,20 @@ def test_excepthook(elasticapm_client): elasticapm_client._excepthook(type_, value, traceback) assert elasticapm_client.events[ERROR] + + +def test_check_server_version(elasticapm_client): + assert elasticapm_client.server_version is None + assert elasticapm_client.check_server_version(gte=(100, 5, 10)) + assert elasticapm_client.check_server_version(lte=(100, 5, 10)) + + elasticapm_client.server_version = (7, 15) + assert elasticapm_client.check_server_version(gte=(7,)) + assert not elasticapm_client.check_server_version(gte=(8,)) + assert not elasticapm_client.check_server_version(lte=(7,)) + assert elasticapm_client.check_server_version(lte=(8,)) + assert elasticapm_client.check_server_version(gte=(7, 12), lte=(7, 15)) + assert elasticapm_client.check_server_version(gte=(7, 15), lte=(7, 15)) + assert elasticapm_client.check_server_version(gte=(7, 15), lte=(7, 16)) + assert not elasticapm_client.check_server_version(gte=(7, 12), lte=(7, 13)) + assert not elasticapm_client.check_server_version(gte=(7, 16), lte=(7, 18)) diff --git a/tests/client/dropped_spans_tests.py b/tests/client/dropped_spans_tests.py new file mode 100644 index 000000000..100a06465 --- /dev/null +++ b/tests/client/dropped_spans_tests.py @@ -0,0 +1,121 @@ +# BSD 3-Clause License +# +# Copyright (c) 2021, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import pytest + +import elasticapm +from elasticapm.conf import constants + + +@pytest.mark.parametrize("elasticapm_client", [{"transaction_max_spans": 5}], indirect=True) +def test_transaction_max_spans(elasticapm_client): + elasticapm_client.begin_transaction("test_type") + for i in range(5): + with elasticapm.capture_span("nodrop"): + pass + for i in range(10): + with elasticapm.capture_span("drop"): + pass + transaction_obj = elasticapm_client.end_transaction("test") + + transaction = elasticapm_client.events[constants.TRANSACTION][0] + spans = elasticapm_client.events[constants.SPAN] + assert all(span["transaction_id"] == transaction["id"] for span in spans) + + assert transaction_obj.dropped_spans == 10 + assert len(spans) == 5 + for span in spans: + assert span["name"] == "nodrop" + assert transaction["span_count"] == {"dropped": 10, "started": 5} + + +@pytest.mark.parametrize("elasticapm_client", [{"transaction_max_spans": 3}], indirect=True) +def test_transaction_max_span_nested(elasticapm_client): + elasticapm_client.begin_transaction("test_type") + with elasticapm.capture_span("1"): + with elasticapm.capture_span("2"): + with elasticapm.capture_span("3"): + with elasticapm.capture_span("4"): + with elasticapm.capture_span("5"): + pass + with elasticapm.capture_span("6"): + pass + with elasticapm.capture_span("7"): + pass + with elasticapm.capture_span("8"): + pass + with elasticapm.capture_span("9"): + pass + transaction_obj = elasticapm_client.end_transaction("test") + + transaction = elasticapm_client.events[constants.TRANSACTION][0] + spans = elasticapm_client.events[constants.SPAN] + + assert transaction_obj.dropped_spans == 6 + assert len(spans) == 3 + for span in spans: + assert span["name"] in ("1", "2", "3") + assert transaction["span_count"] == {"dropped": 6, "started": 3} + + +@pytest.mark.parametrize("elasticapm_client", [{"transaction_max_spans": 1}], indirect=True) +def test_transaction_max_span_dropped_statistics(elasticapm_client): + elasticapm_client.begin_transaction("test_type") + with elasticapm.capture_span("not_dropped"): + pass + for i in range(10): + resource = str(i % 2) + with elasticapm.capture_span( + span_type="x", span_subtype="y", extra={"destination": {"service": {"resource": resource}}}, duration=100 + ): + pass + elasticapm_client.end_transaction() + transaction = elasticapm_client.events[constants.TRANSACTION][0] + spans = elasticapm_client.events[constants.SPAN] + + assert len(spans) == 1 + for entry in transaction["dropped_spans_stats"]: + assert entry["duration"]["count"] == 5 + assert entry["duration"]["sum"]["us"] == 500000000 + + +@pytest.mark.parametrize("elasticapm_client", [{"transaction_max_spans": 1, "server_version": (7, 15)}], indirect=True) +def test_transaction_max_span_dropped_statistics_not_collected_for_incompatible_server(elasticapm_client): + elasticapm_client.begin_transaction("test_type") + with elasticapm.capture_span("not_dropped"): + pass + with elasticapm.capture_span( + span_type="x", span_subtype="x", extra={"destination": {"service": {"resource": "y"}}}, duration=100 + ): + pass + elasticapm_client.end_transaction() + transaction = elasticapm_client.events[constants.TRANSACTION][0] + spans = elasticapm_client.events[constants.SPAN] + assert len(spans) == 1 + assert "dropped_spans_stats" not in transaction