From aad1525f155d43bd717c840b91a998916143eb6d Mon Sep 17 00:00:00 2001 From: Benjamin Wohlwend Date: Thu, 9 Sep 2021 10:05:18 +0200 Subject: [PATCH] Implemented span compression algorithm See https://github.com/elastic/apm/issues/432 --- elasticapm/conf/__init__.py | 12 + elasticapm/instrumentation/packages/dbapi2.py | 1 + .../instrumentation/packages/httpcore.py | 2 + .../instrumentation/packages/httplib2.py | 2 + elasticapm/instrumentation/packages/urllib.py | 2 + .../instrumentation/packages/urllib3.py | 2 + elasticapm/traces.py | 237 +++++++++++++----- elasticapm/utils/__init__.py | 28 ++- tests/client/span_compression_tests.py | 144 +++++++++++ tests/utils/tests.py | 20 ++ 10 files changed, 383 insertions(+), 67 deletions(-) create mode 100644 tests/client/span_compression_tests.py diff --git a/elasticapm/conf/__init__.py b/elasticapm/conf/__init__.py index 2a35d4d60..25f13b4ec 100644 --- a/elasticapm/conf/__init__.py +++ b/elasticapm/conf/__init__.py @@ -579,6 +579,18 @@ class Config(_ConfigBase): ], type=int, ) + span_compression_exact_match_max_duration = _ConfigValue( + "span_compression_exact_match_max_duration", + default=5, + validators=[duration_validator], + type=int, + ) + span_compression_same_kind_max_duration = _ConfigValue( + "span_compression_exact_match_max_duration", + default=5, + validators=[duration_validator], + type=int, + ) collect_local_variables = _ConfigValue("COLLECT_LOCAL_VARIABLES", default="errors") source_lines_error_app_frames = _ConfigValue("SOURCE_LINES_ERROR_APP_FRAMES", type=int, default=5) source_lines_error_library_frames = _ConfigValue("SOURCE_LINES_ERROR_LIBRARY_FRAMES", type=int, default=5) diff --git a/elasticapm/instrumentation/packages/dbapi2.py b/elasticapm/instrumentation/packages/dbapi2.py index 33e0fbcf1..39ab137a4 100644 --- a/elasticapm/instrumentation/packages/dbapi2.py +++ b/elasticapm/instrumentation/packages/dbapi2.py @@ -237,6 +237,7 @@ def _trace_sql(self, method, sql, params, action=QUERY_ACTION): span_action=action, extra={"db": {"type": "sql", "statement": sql_string}, "destination": self._self_destination_info}, skip_frames=1, + leaf=True, ) as span: if params is None: result = method(sql) diff --git a/elasticapm/instrumentation/packages/httpcore.py b/elasticapm/instrumentation/packages/httpcore.py index 550939d9d..eebcfa7ed 100644 --- a/elasticapm/instrumentation/packages/httpcore.py +++ b/elasticapm/instrumentation/packages/httpcore.py @@ -96,6 +96,8 @@ def call(self, module, method, wrapped, instance, args, kwargs): span_id=parent_id, trace_options=TracingOptions(recorded=True) ) self._set_disttracing_headers(headers, trace_parent, transaction) + if leaf_span: + leaf_span.dist_tracing_propagated = True response = wrapped(*args, **kwargs) if len(response) > 4: # httpcore < 0.11.0 diff --git a/elasticapm/instrumentation/packages/httplib2.py b/elasticapm/instrumentation/packages/httplib2.py index 6643af2cb..5c254207e 100644 --- a/elasticapm/instrumentation/packages/httplib2.py +++ b/elasticapm/instrumentation/packages/httplib2.py @@ -93,6 +93,8 @@ def call(self, module, method, wrapped, instance, args, kwargs): span_id=parent_id, trace_options=TracingOptions(recorded=True) ) self._set_disttracing_headers(params["headers"], trace_parent, transaction) + if leaf_span: + leaf_span.dist_tracing_propagated = True response, content = wrapped(*args, **kwargs) if span.context: diff --git a/elasticapm/instrumentation/packages/urllib.py b/elasticapm/instrumentation/packages/urllib.py index f8ca6ec7c..c3629a88e 100644 --- a/elasticapm/instrumentation/packages/urllib.py +++ b/elasticapm/instrumentation/packages/urllib.py @@ -96,6 +96,8 @@ def call(self, module, method, wrapped, instance, args, kwargs): span_id=parent_id, trace_options=TracingOptions(recorded=True) ) self._set_disttracing_headers(request_object, trace_parent, transaction) + if leaf_span: + leaf_span.dist_tracing_propagated = True response = wrapped(*args, **kwargs) if response: status = getattr(response, "status", None) or response.getcode() # Python 2 compat diff --git a/elasticapm/instrumentation/packages/urllib3.py b/elasticapm/instrumentation/packages/urllib3.py index baf332412..35a08f750 100644 --- a/elasticapm/instrumentation/packages/urllib3.py +++ b/elasticapm/instrumentation/packages/urllib3.py @@ -127,6 +127,8 @@ def call(self, module, method, wrapped, instance, args, kwargs): span_id=parent_id, trace_options=TracingOptions(recorded=True) ) args, kwargs = update_headers(args, kwargs, instance, transaction, trace_parent) + if leaf_span: + leaf_span.dist_tracing_propagated = True response = wrapped(*args, **kwargs) if response: if span.context: diff --git a/elasticapm/traces.py b/elasticapm/traces.py index 8cce0eee8..a7ef6bd6c 100644 --- a/elasticapm/traces.py +++ b/elasticapm/traces.py @@ -35,12 +35,13 @@ import time import timeit from collections import defaultdict +from typing import Callable, Optional, Union from elasticapm.conf import constants from elasticapm.conf.constants import LABEL_RE, SPAN, TRANSACTION from elasticapm.context import init_execution_context from elasticapm.metrics.base_metrics import Timer -from elasticapm.utils import compat, encoding, get_name_from_func +from elasticapm.utils import compat, encoding, get_name_from_func, nested_key from elasticapm.utils.disttracing import TraceParent, TracingOptions from elasticapm.utils.logging import get_logger @@ -54,31 +55,33 @@ execution_context = init_execution_context() +SpanType = Union["Span", "BaseSpan"] + class ChildDuration(object): __slots__ = ("obj", "_nesting_level", "_start", "_duration", "_lock") - def __init__(self, obj): + def __init__(self, obj: "BaseSpan"): self.obj = obj - self._nesting_level = 0 - self._start = None - self._duration = 0 + self._nesting_level: int = 0 + self._start: Optional[float] = None + self._duration: float = 0 self._lock = threading.Lock() - def start(self, timestamp): + def start(self, timestamp: float): with self._lock: self._nesting_level += 1 if self._nesting_level == 1: self._start = timestamp - def stop(self, timestamp): + def stop(self, timestamp: float): with self._lock: self._nesting_level -= 1 if self._nesting_level == 0: self._duration += timestamp - self._start @property - def duration(self): + def duration(self) -> float: return self._duration @@ -86,17 +89,34 @@ class BaseSpan(object): def __init__(self, labels=None): self._child_durations = ChildDuration(self) self.labels = {} - self.outcome = None + self.outcome: Optional[str] = None + self.compression_buffer: Optional[Union[Span, DroppedSpan]] = None + self.compression_buffer_lock = threading.Lock() if labels: self.label(**labels) def child_started(self, timestamp): self._child_durations.start(timestamp) - def child_ended(self, timestamp): - self._child_durations.stop(timestamp) + def child_ended(self, child: SpanType): + with self.compression_buffer_lock: + if not child.is_compression_eligible(): + if self.compression_buffer: + self.tracer.queue_func(SPAN, self.compression_buffer.to_dict()) + self.compression_buffer = None + self.tracer.queue_func(SPAN, child.to_dict()) + return + if self.compression_buffer is None: + self.compression_buffer = child + return + if not self.compression_buffer.try_to_compress(child): + self.tracer.queue_func(SPAN, self.compression_buffer.to_dict()) + self.compression_buffer = child - def end(self, skip_frames=0, duration=None): + def end(self, skip_frames: int = 0, duration: Optional[int] = None): + raise NotImplementedError() + + def to_dict(self) -> dict: raise NotImplementedError() def label(self, **labels): @@ -115,19 +135,29 @@ def label(self, **labels): self.labels.update(labels) def set_success(self): - self.outcome = "success" + self.outcome = constants.OUTCOME.SUCCESS def set_failure(self): - self.outcome = "failure" + self.outcome = constants.OUTCOME.FAILURE @staticmethod - def get_dist_tracing_id(): + def get_dist_tracing_id() -> str: return "%016x" % random.getrandbits(64) + @property + def tracer(self) -> "Tracer": + raise NotImplementedError() + class Transaction(BaseSpan): def __init__( - self, tracer, transaction_type="custom", trace_parent=None, is_sampled=True, start=None, sample_rate=None + self, + tracer: "Tracer", + transaction_type: str = "custom", + trace_parent: Optional[TraceParent] = None, + is_sampled: bool = True, + start: Optional[float] = None, + sample_rate: Optional[float] = None, ): self.id = self.get_dist_tracing_id() self.trace_parent = trace_parent @@ -135,18 +165,18 @@ def __init__( self.timestamp = self.start_time = start else: self.timestamp, self.start_time = time.time(), _time_func() - self.name = None - self.duration = None - self.result = None + self.name: Optional[str] = None + self.duration: Optional[float] = None + self.result: Optional[str] = None self.transaction_type = transaction_type - self.tracer = tracer + self._tracer = tracer - self.dropped_spans = 0 + self.dropped_spans: int = 0 self.context = {} self._is_sampled = is_sampled self.sample_rate = sample_rate - self._span_counter = 0 + self._span_counter: int = 0 self._span_timers = defaultdict(Timer) self._span_timers_lock = threading.Lock() try: @@ -163,8 +193,10 @@ def __init__( self._transaction_metrics = None super(Transaction, self).__init__() - def end(self, skip_frames=0, duration=None): + def end(self, skip_frames: int = 0, duration: Optional[float] = None): self.duration = duration if duration is not None else (_time_func() - self.start_time) + if self.compression_buffer: + self.tracer.queue_func(SPAN, self.compression_buffer.to_dict()) if self._transaction_metrics: self._transaction_metrics.timer( "transaction.duration", @@ -274,7 +306,7 @@ def begin_span( start=start, ) - def end_span(self, skip_frames=0, duration=None, outcome="unknown"): + def end_span(self, skip_frames: int = 0, duration: Optional[float] = None, outcome: str = "unknown"): """ End the currently active span :param skip_frames: numbers of frames to skip in the stack trace @@ -293,7 +325,7 @@ def end_span(self, skip_frames=0, duration=None, outcome="unknown"): span.end(skip_frames=skip_frames, duration=duration) return span - def ensure_parent_id(self): + def ensure_parent_id(self) -> str: """If current trace_parent has no span_id, generate one, then return it This is used to generate a span ID which the RUM agent will use to correlate @@ -304,7 +336,7 @@ def ensure_parent_id(self): logger.debug("Set parent id to generated %s", self.trace_parent.span_id) return self.trace_parent.span_id - def to_dict(self): + def to_dict(self) -> dict: self.context["tags"] = self.labels result = { "id": self.id, @@ -336,7 +368,7 @@ def track_span_duration(self, span_type, span_subtype, self_duration): self._span_timers[(span_type, span_subtype)].update(self_duration) @property - def is_sampled(self): + def is_sampled(self) -> bool: return self._is_sampled @is_sampled.setter @@ -352,6 +384,10 @@ def is_sampled(self, is_sampled): self.sample_rate = "0" self.trace_parent.add_tracestate(constants.TRACESTATE.SAMPLE_RATE, self.sample_rate) + @property + def tracer(self) -> "Tracer": + return self._tracer + class Span(BaseSpan): __slots__ = ( @@ -363,6 +399,7 @@ class Span(BaseSpan): "action", "context", "leaf", + "dist_tracing_propagated", "timestamp", "start_time", "duration", @@ -377,18 +414,18 @@ class Span(BaseSpan): def __init__( self, - transaction, - name, - span_type, - context=None, - leaf=False, - labels=None, - parent=None, - parent_span_id=None, - span_subtype=None, - span_action=None, - sync=None, - start=None, + transaction: Transaction, + name: str, + span_type: str, + context: Optional[dict] = None, + leaf: bool = False, + labels: Optional[dict] = None, + parent: Optional["Span"] = None, + parent_span_id: Optional[str] = None, + span_subtype: Optional[str] = None, + span_action: Optional[str] = None, + sync: Optional[bool] = None, + start: Optional[int] = None, ): """ Create a new Span @@ -421,6 +458,8 @@ def __init__( self.parent_span_id = parent_span_id self.frames = None self.sync = sync + self.dist_tracing_propagated = False + self.composite: Union[dict, None] = None if span_subtype is None and "." in span_type: # old style dottet type, let's split it up type_bits = span_type.split(".") @@ -436,7 +475,7 @@ def __init__( p.child_started(self.start_time) super(Span, self).__init__(labels=labels) - def to_dict(self): + def to_dict(self) -> dict: result = { "id": self.id, "transaction_id": self.transaction.id, @@ -463,9 +502,38 @@ def to_dict(self): result["context"] = self.context if self.frames: result["stacktrace"] = self.frames + if self.composite: + result["composite"] = self.composite return result - def end(self, skip_frames=0, duration=None): + def is_same_kind(self, other_span: SpanType) -> bool: + """ + For compression purposes, two spans are considered to be of the same kind if they have the same + values for type, subtype, and destination.service.resource + :param other_span: another span object + :return: bool + """ + resource = nested_key(self.context, "destination", "service", "resource") + return bool( + self.type == other_span.type + and self.subtype == other_span.subtype + and (resource and resource == nested_key(other_span.context, "destination", "service", "resource")) + ) + + def is_exact_match(self, other_span: SpanType) -> bool: + """ + For compression purposes, two spans are considered to be an exact match if the have the same + name and are of the same kind. + + :param other_span: another span object + :return: bool + """ + return bool(self.name == other_span.name and self.is_same_kind(other_span)) + + def is_compression_eligible(self) -> bool: + return self.leaf and not self.dist_tracing_propagated and self.outcome in (None, constants.OUTCOME.SUCCESS) + + def end(self, skip_frames: int = 0, duration: Optional[int] = None): """ End this span and queue it for sending. @@ -481,13 +549,59 @@ def end(self, skip_frames=0, duration=None): else: self.frames = None execution_context.set_span(self.parent) - tracer.queue_func(SPAN, self.to_dict()) + + p = self.parent if self.parent else self.transaction if self.transaction._breakdown: - p = self.parent if self.parent else self.transaction - p.child_ended(self.start_time + self.duration) + self._child_durations.stop(self.start_time + self.duration) self.transaction.track_span_duration( self.type, self.subtype, self.duration - self._child_durations.duration ) + p.child_ended(self) + + def try_to_compress(self, sibling: SpanType) -> bool: + is_composite = self.composite is not None + can_be_compressed = ( + self._try_to_compress_composite(sibling) if is_composite else self._try_to_compress_regular(sibling) + ) + if not can_be_compressed: + return False + + if not is_composite: + self.composite["count"] = 1 + self.composite["sum"] = self.duration + self.composite["count"] += 1 + self.composite["sum"] += sibling.duration + return True + + def _try_to_compress_composite(self, sibling: SpanType) -> bool: + if self.composite["compression_strategy"] == "exact_match": + return ( + self.is_exact_match(sibling) + and sibling.duration <= self.transaction.tracer.config.span_compression_exact_match_max_duration + ) + elif self.composite["compression_strategy"] == "same_kind": + return ( + self.is_same_kind(sibling) + and sibling.duration <= self.transaction.tracer.config.span_compression_same_kind_max_duration + ) + # can't handle other compression strategies + return False + + def _try_to_compress_regular(self, sibling: SpanType) -> bool: + if not self.is_same_kind(sibling): + return False + if self.name == sibling.name: + max_duration = self.transaction.tracer.config.span_compression_exact_match_max_duration + if self.duration <= max_duration and sibling.duration <= max_duration: + self.composite = {"compression_strategy": "exact_match"} + return True + return False + max_duration = self.transaction.tracer.config.span_compression_same_kind_max_duration + if self.duration <= max_duration and sibling.duration <= max_duration: + self.composite = {"compression_strategy": "same_kind"} + self.name = "Calls to " + self.context["destination"]["service"]["resource"] + return True + return False def update_context(self, key, data): """ @@ -503,14 +617,19 @@ def update_context(self, key, data): def __str__(self): return u"{}/{}/{}".format(self.name, self.type, self.subtype) + @property + def tracer(self) -> "Tracer": + return self.transaction.tracer + class DroppedSpan(BaseSpan): - __slots__ = ("leaf", "parent", "id") + __slots__ = ("leaf", "parent", "id", "dist_tracing_propagated") def __init__(self, parent, leaf=False): self.parent = parent self.leaf = leaf self.id = None + self.dist_tracing_propagated = False super(DroppedSpan, self).__init__() def end(self, skip_frames=0, duration=None): @@ -519,7 +638,7 @@ def end(self, skip_frames=0, duration=None): def child_started(self, timestamp): pass - def child_ended(self, timestamp): + def child_ended(self, child: SpanType): pass def update_context(self, key, data): @@ -651,17 +770,17 @@ class capture_span(object): def __init__( self, - name=None, - span_type="code.custom", - extra=None, - skip_frames=0, - leaf=False, - labels=None, - span_subtype=None, - span_action=None, - start=None, - duration=None, - sync=None, + name: Optional[str] = None, + span_type: str = "code.custom", + extra: Optional[dict] = None, + skip_frames: int = 0, + leaf: bool = False, + labels: Optional[dict] = None, + span_subtype: Optional[str] = None, + span_action: Optional[str] = None, + start: Optional[int] = None, + duration: Optional[int] = None, + sync: Optional[bool] = None, ): self.name = name self.type = span_type @@ -675,7 +794,7 @@ def __init__( self.duration = duration self.sync = sync - def __call__(self, func): + def __call__(self, func: Callable) -> Callable: self.name = self.name or get_name_from_func(func) @functools.wraps(func) @@ -685,7 +804,7 @@ def decorated(*args, **kwds): return decorated - def __enter__(self): + def __enter__(self) -> Union[Span, DroppedSpan, None]: transaction = execution_context.get_transaction() if transaction and transaction.is_sampled: return transaction.begin_span( diff --git a/elasticapm/utils/__init__.py b/elasticapm/utils/__init__.py index 12084c478..b823a9e5d 100644 --- a/elasticapm/utils/__init__.py +++ b/elasticapm/utils/__init__.py @@ -32,6 +32,8 @@ import os import re from functools import partial +from types import FunctionType +from typing import Pattern from elasticapm.conf import constants from elasticapm.utils import compat, encoding @@ -73,7 +75,7 @@ def varmap(func, var, context=None, name=None, **kwargs): return ret -def get_name_from_func(func): +def get_name_from_func(func: FunctionType) -> str: # partials don't have `__module__` or `__name__`, so we use the values from the "inner" function if isinstance(func, partial_types): return "partial({})".format(get_name_from_func(func.func)) @@ -94,7 +96,7 @@ def build_name_with_http_method_prefix(name, request): return " ".join((request.method, name)) if name else name -def is_master_process(): +def is_master_process() -> bool: # currently only recognizes uwsgi master process try: import uwsgi @@ -104,7 +106,7 @@ def is_master_process(): return False -def get_url_dict(url): +def get_url_dict(url: str) -> dict: parse_result = compat.urlparse.urlparse(url) url_dict = { @@ -123,14 +125,14 @@ def get_url_dict(url): return url_dict -def sanitize_url(url): +def sanitize_url(url: str) -> str: if "@" not in url: return url parts = compat.urlparse.urlparse(url) return url.replace("%s:%s" % (parts.username, parts.password), "%s:%s" % (parts.username, constants.MASK)) -def get_host_from_url(url): +def get_host_from_url(url: str) -> str: parsed_url = compat.urlparse.urlparse(url) host = parsed_url.hostname or " " @@ -140,7 +142,7 @@ def get_host_from_url(url): return host -def url_to_destination(url, service_type="external"): +def url_to_destination(url: str, service_type: str = "external") -> dict: parts = compat.urlparse.urlsplit(url) hostname = parts.hostname if parts.hostname else "" # preserve brackets for IPv6 URLs @@ -163,7 +165,7 @@ def url_to_destination(url, service_type="external"): return {"service": {"name": name, "resource": resource, "type": service_type}} -def read_pem_file(file_obj): +def read_pem_file(file_obj) -> bytes: cert = b"" for line in file_obj: if line.startswith(b"-----BEGIN CERTIFICATE-----"): @@ -176,7 +178,7 @@ def read_pem_file(file_obj): return base64.b64decode(cert) -def starmatch_to_regex(pattern): +def starmatch_to_regex(pattern: str) -> Pattern: options = re.DOTALL # check if we are case sensitive if pattern.startswith("(?-i)"): @@ -193,3 +195,13 @@ def starmatch_to_regex(pattern): else: res.append(re.escape(c)) return re.compile(r"(?:%s)\Z" % "".join(res), options) + + +def nested_key(d: dict, *args): + for arg in args: + try: + d = d[arg] + except (TypeError, KeyError): + d = None + break + return d diff --git a/tests/client/span_compression_tests.py b/tests/client/span_compression_tests.py new file mode 100644 index 000000000..d0ba85b63 --- /dev/null +++ b/tests/client/span_compression_tests.py @@ -0,0 +1,144 @@ +# BSD 3-Clause License +# +# Copyright (c) 2021, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import elasticapm +from elasticapm.conf.constants import SPAN + + +def test_exact_match(elasticapm_client): + transaction = elasticapm_client.begin_transaction("test") + with elasticapm.capture_span( + "test", + span_type="a", + span_subtype="b", + span_action="c", + leaf=True, + duration=2, + extra={"destination": {"service": {"resource": "x"}}}, + ) as span1: + assert span1.is_compression_eligible() + with elasticapm.capture_span( + "test", + span_type="a", + span_subtype="b", + span_action="c", + leaf=True, + duration=3, + extra={"destination": {"service": {"resource": "x"}}}, + ) as span2: + assert span2.is_compression_eligible() + assert span1.is_exact_match(span2) + elasticapm_client.end_transaction("test") + spans = elasticapm_client.events[SPAN] + assert len(spans) == 1 + span = spans[0] + assert "composite" in span + assert span["composite"]["count"] == 2 + assert span["composite"]["sum"] == 5 + assert span["composite"]["compression_strategy"] == "exact_match" + + +def test_same_kind(elasticapm_client): + transaction = elasticapm_client.begin_transaction("test") + with elasticapm.capture_span( + "test1", + span_type="a", + span_subtype="b", + span_action="c", + leaf=True, + duration=2, + extra={"destination": {"service": {"resource": "x"}}}, + ) as span1: + assert span1.is_compression_eligible() + with elasticapm.capture_span( + "test2", + span_type="a", + span_subtype="b", + span_action="c", + leaf=True, + duration=3, + extra={"destination": {"service": {"resource": "x"}}}, + ) as span2: + assert span2.is_compression_eligible() + assert not span1.is_exact_match(span2) + assert span1.is_same_kind(span2) + elasticapm_client.end_transaction("test") + spans = elasticapm_client.events[SPAN] + assert len(spans) == 1 + span = spans[0] + + assert span["name"] == "Calls to x" + assert "composite" in span + assert span["composite"]["count"] == 2 + assert span["composite"]["sum"] == 5 + assert span["composite"]["compression_strategy"] == "same_kind" + + +def test_exact_match_after_same_kind(elasticapm_client): + # if a span that is an exact match is attempted to be compressed with a same_kind composite, it stays same_kind + transaction = elasticapm_client.begin_transaction("test") + with elasticapm.capture_span( + "test1", + span_type="a", + span_subtype="b", + span_action="c", + leaf=True, + duration=2, + extra={"destination": {"service": {"resource": "x"}}}, + ) as span1: + assert span1.is_compression_eligible() + with elasticapm.capture_span( + "test2", + span_type="a", + span_subtype="b", + span_action="c", + leaf=True, + duration=3, + extra={"destination": {"service": {"resource": "x"}}}, + ) as span2: + assert span2.is_compression_eligible() + assert not span1.is_exact_match(span2) + assert span1.is_same_kind(span2) + with elasticapm.capture_span( + "test1", + span_type="a", + span_subtype="b", + span_action="c", + leaf=True, + duration=2, + extra={"destination": {"service": {"resource": "x"}}}, + ) as span3: + assert span3.is_compression_eligible() + elasticapm_client.end_transaction("test") + spans = elasticapm_client.events[SPAN] + assert len(spans) == 1 + span = spans[0] + assert span["composite"]["compression_strategy"] == "same_kind" + assert span["composite"]["count"] == 3 diff --git a/tests/utils/tests.py b/tests/utils/tests.py index 38109b9de..ff2b4a0f8 100644 --- a/tests/utils/tests.py +++ b/tests/utils/tests.py @@ -37,6 +37,7 @@ from elasticapm.utils import ( get_name_from_func, get_url_dict, + nested_key, read_pem_file, sanitize_url, starmatch_to_regex, @@ -239,3 +240,22 @@ def test_read_pem_file_chain(): with open(os.path.join(os.path.dirname(__file__), "..", "ca", "chain.crt"), mode="rb") as f: result = read_pem_file(f) assert result.endswith(b"\xc8\xae") + + +@pytest.mark.parametrize( + "data,key,expected", + [ + (None, "x", None), + ({}, "x", None), + ({"x": 1}, "x", 1), + ({"x": {"y": 1}}, "x.y", 1), + ({"x": 1}, "x.y", None), + ({"x": {"y": {}}}, "x.y.z", None), + ], +) +def test_nested_key(data, key, expected): + r = nested_key(data, *key.split(".")) + if expected is None: + assert r is expected + else: + assert r == expected