From 962477524b15a1d04439d27561920d9b79dda739 Mon Sep 17 00:00:00 2001 From: Benjamin Wohlwend Date: Sat, 16 Oct 2021 08:25:33 +0200 Subject: [PATCH] Implemented span compression algorithm (#1321) * Implemented span compression algorithm See https://github.com/elastic/apm/issues/432 * disable span compression by default in tests * make _try_to_compress_composite and _try_to_compress_regular side effect free * fix issue with compression buffer not being reported * fix some type hinting issues/errors * move traceparent initialization into Transaction constructor This cleans up a wart where `traceparent` is initialized to None, and needs to be set by the callee right after, as there is code that assumes `traceparent` to be set. * fix breakdown metrics calculation * fix breakdown metrics test we no longer only call `child_ended` for breakdown metrics * update duration of the composite span based on compressed spans * add docs for compressed span config options * clean up merge Co-authored-by: Colton Myers --- docs/configuration.asciidoc | 43 +++ elasticapm/conf/__init__.py | 12 + elasticapm/instrumentation/packages/dbapi2.py | 1 + .../instrumentation/packages/httpcore.py | 2 + .../instrumentation/packages/httplib2.py | 2 + elasticapm/instrumentation/packages/urllib.py | 2 + .../instrumentation/packages/urllib3.py | 2 + elasticapm/traces.py | 302 +++++++++++++----- elasticapm/utils/__init__.py | 18 +- tests/client/span_compression_tests.py | 222 +++++++++++++ tests/contrib/django/fixtures.py | 4 + tests/fixtures.py | 6 + tests/metrics/breakdown_tests.py | 5 +- 13 files changed, 536 insertions(+), 85 deletions(-) create mode 100644 tests/client/span_compression_tests.py diff --git a/docs/configuration.asciidoc b/docs/configuration.asciidoc index 637c830de..245e823bf 100644 --- a/docs/configuration.asciidoc +++ b/docs/configuration.asciidoc @@ -691,6 +691,49 @@ To disable stack trace collection for spans completely, set the value to `0`. Except for the special values `-1` and `0`, this setting has to be provided in *<>*. +[float] +[[config-span-compression-exact-match-max_duration]] +==== `span_compression_exact_match_max_duration` + +<> + +[options="header"] +|============ +| Environment | Django/Flask | Default +| `ELASTIC_APM_SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION` | `SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION` | `"50ms"` +|============ + +Consecutive spans that are exact match and that are under this threshold will be compressed into a single composite span. +This reduces the collection, processing, and storage overhead, and removes clutter from the UI. +The tradeoff is that the DB statements of all the compressed spans will not be collected. + +Two spans are considered exact matches if the following attributes are identical: + * span name + * span type + * span subtype + * destination resource (e.g. the Database name) + +[float] +[[config-span-compression-same-kind-max-duration]] +==== `span_compression_same_kind_max_duration` + +<> + +[options="header"] +|============ +| Environment | Django/Flask | Default +| `ELASTIC_APM_SPAN_COMPRESSION_SAME_KIND_MAX_DURATION` | `SPAN_COMPRESSION_SAME_KIND_MAX_DURATION` | `"5ms"` +|============ + +Consecutive spans to the same destination that are under this threshold will be compressed into a single composite span. +This reduces the collection, processing, and storage overhead, and removes clutter from the UI. +The tradeoff is that metadata such as database statements of all the compressed spans will not be collected. + +Two spans are considered to be of the same kind if the following attributes are identical: + * span type + * span subtype + * destination resource (e.g. the Database name) + [float] [[config-api-request-size]] ==== `api_request_size` diff --git a/elasticapm/conf/__init__.py b/elasticapm/conf/__init__.py index 2a35d4d60..25f13b4ec 100644 --- a/elasticapm/conf/__init__.py +++ b/elasticapm/conf/__init__.py @@ -579,6 +579,18 @@ class Config(_ConfigBase): ], type=int, ) + span_compression_exact_match_max_duration = _ConfigValue( + "span_compression_exact_match_max_duration", + default=5, + validators=[duration_validator], + type=int, + ) + span_compression_same_kind_max_duration = _ConfigValue( + "span_compression_exact_match_max_duration", + default=5, + validators=[duration_validator], + type=int, + ) collect_local_variables = _ConfigValue("COLLECT_LOCAL_VARIABLES", default="errors") source_lines_error_app_frames = _ConfigValue("SOURCE_LINES_ERROR_APP_FRAMES", type=int, default=5) source_lines_error_library_frames = _ConfigValue("SOURCE_LINES_ERROR_LIBRARY_FRAMES", type=int, default=5) diff --git a/elasticapm/instrumentation/packages/dbapi2.py b/elasticapm/instrumentation/packages/dbapi2.py index 33e0fbcf1..39ab137a4 100644 --- a/elasticapm/instrumentation/packages/dbapi2.py +++ b/elasticapm/instrumentation/packages/dbapi2.py @@ -237,6 +237,7 @@ def _trace_sql(self, method, sql, params, action=QUERY_ACTION): span_action=action, extra={"db": {"type": "sql", "statement": sql_string}, "destination": self._self_destination_info}, skip_frames=1, + leaf=True, ) as span: if params is None: result = method(sql) diff --git a/elasticapm/instrumentation/packages/httpcore.py b/elasticapm/instrumentation/packages/httpcore.py index 550939d9d..eebcfa7ed 100644 --- a/elasticapm/instrumentation/packages/httpcore.py +++ b/elasticapm/instrumentation/packages/httpcore.py @@ -96,6 +96,8 @@ def call(self, module, method, wrapped, instance, args, kwargs): span_id=parent_id, trace_options=TracingOptions(recorded=True) ) self._set_disttracing_headers(headers, trace_parent, transaction) + if leaf_span: + leaf_span.dist_tracing_propagated = True response = wrapped(*args, **kwargs) if len(response) > 4: # httpcore < 0.11.0 diff --git a/elasticapm/instrumentation/packages/httplib2.py b/elasticapm/instrumentation/packages/httplib2.py index 6643af2cb..5c254207e 100644 --- a/elasticapm/instrumentation/packages/httplib2.py +++ b/elasticapm/instrumentation/packages/httplib2.py @@ -93,6 +93,8 @@ def call(self, module, method, wrapped, instance, args, kwargs): span_id=parent_id, trace_options=TracingOptions(recorded=True) ) self._set_disttracing_headers(params["headers"], trace_parent, transaction) + if leaf_span: + leaf_span.dist_tracing_propagated = True response, content = wrapped(*args, **kwargs) if span.context: diff --git a/elasticapm/instrumentation/packages/urllib.py b/elasticapm/instrumentation/packages/urllib.py index f8ca6ec7c..c3629a88e 100644 --- a/elasticapm/instrumentation/packages/urllib.py +++ b/elasticapm/instrumentation/packages/urllib.py @@ -96,6 +96,8 @@ def call(self, module, method, wrapped, instance, args, kwargs): span_id=parent_id, trace_options=TracingOptions(recorded=True) ) self._set_disttracing_headers(request_object, trace_parent, transaction) + if leaf_span: + leaf_span.dist_tracing_propagated = True response = wrapped(*args, **kwargs) if response: status = getattr(response, "status", None) or response.getcode() # Python 2 compat diff --git a/elasticapm/instrumentation/packages/urllib3.py b/elasticapm/instrumentation/packages/urllib3.py index baf332412..35a08f750 100644 --- a/elasticapm/instrumentation/packages/urllib3.py +++ b/elasticapm/instrumentation/packages/urllib3.py @@ -127,6 +127,8 @@ def call(self, module, method, wrapped, instance, args, kwargs): span_id=parent_id, trace_options=TracingOptions(recorded=True) ) args, kwargs = update_headers(args, kwargs, instance, transaction, trace_parent) + if leaf_span: + leaf_span.dist_tracing_propagated = True response = wrapped(*args, **kwargs) if response: if span.context: diff --git a/elasticapm/traces.py b/elasticapm/traces.py index bf352ef3c..c6f465e0d 100644 --- a/elasticapm/traces.py +++ b/elasticapm/traces.py @@ -35,12 +35,13 @@ import time import timeit from collections import defaultdict +from typing import Any, Callable, Dict, Optional, Tuple, Union from elasticapm.conf import constants from elasticapm.conf.constants import LABEL_RE, SPAN, TRANSACTION from elasticapm.context import init_execution_context from elasticapm.metrics.base_metrics import Timer -from elasticapm.utils import compat, encoding, get_name_from_func +from elasticapm.utils import compat, encoding, get_name_from_func, nested_key from elasticapm.utils.disttracing import TraceParent, TracingOptions from elasticapm.utils.logging import get_logger @@ -54,31 +55,33 @@ execution_context = init_execution_context() +SpanType = Union["Span", "DroppedSpan"] + class ChildDuration(object): __slots__ = ("obj", "_nesting_level", "_start", "_duration", "_lock") - def __init__(self, obj): + def __init__(self, obj: "BaseSpan"): self.obj = obj - self._nesting_level = 0 - self._start = None - self._duration = 0 + self._nesting_level: int = 0 + self._start: float = 0 + self._duration: float = 0 self._lock = threading.Lock() - def start(self, timestamp): + def start(self, timestamp: float): with self._lock: self._nesting_level += 1 if self._nesting_level == 1: self._start = timestamp - def stop(self, timestamp): + def stop(self, timestamp: float): with self._lock: self._nesting_level -= 1 if self._nesting_level == 0: self._duration += timestamp - self._start @property - def duration(self): + def duration(self) -> float: return self._duration @@ -86,20 +89,40 @@ class BaseSpan(object): def __init__(self, labels=None, start=None): self._child_durations = ChildDuration(self) self.labels = {} - self.outcome = None - self.start_time = start or _time_func() - self.duration = None + self.outcome: Optional[str] = None + self.compression_buffer: Optional[Union[Span, DroppedSpan]] = None + self.compression_buffer_lock = threading.Lock() + self.start_time: float = start or _time_func() + self.ended_time: Optional[float] = None + self.duration: Optional[float] = None if labels: self.label(**labels) def child_started(self, timestamp): self._child_durations.start(timestamp) - def child_ended(self, timestamp): - self._child_durations.stop(timestamp) - - def end(self, skip_frames=0, duration=None): - self.duration = duration if duration is not None else (_time_func() - self.start_time) + def child_ended(self, child: SpanType): + with self.compression_buffer_lock: + if not child.is_compression_eligible(): + if self.compression_buffer: + self.compression_buffer.report() + self.compression_buffer = None + child.report() + elif self.compression_buffer is None: + self.compression_buffer = child + elif not self.compression_buffer.try_to_compress(child): + self.compression_buffer.report() + self.compression_buffer = child + + def end(self, skip_frames: int = 0, duration: Optional[float] = None): + self.ended_time = _time_func() + self.duration = duration if duration is not None else (self.ended_time - self.start_time) + if self.compression_buffer: + self.compression_buffer.report() + self.compression_buffer = None + + def to_dict(self) -> dict: + raise NotImplementedError() def label(self, **labels): """ @@ -117,35 +140,53 @@ def label(self, **labels): self.labels.update(labels) def set_success(self): - self.outcome = "success" + self.outcome = constants.OUTCOME.SUCCESS def set_failure(self): - self.outcome = "failure" + self.outcome = constants.OUTCOME.FAILURE @staticmethod - def get_dist_tracing_id(): + def get_dist_tracing_id() -> str: return "%016x" % random.getrandbits(64) + @property + def tracer(self) -> "Tracer": + raise NotImplementedError() + class Transaction(BaseSpan): def __init__( - self, tracer, transaction_type="custom", trace_parent=None, is_sampled=True, start=None, sample_rate=None + self, + tracer: "Tracer", + transaction_type: str = "custom", + trace_parent: Optional[TraceParent] = None, + is_sampled: bool = True, + start: Optional[float] = None, + sample_rate: Optional[float] = None, ): self.id = self.get_dist_tracing_id() - self.trace_parent = trace_parent + if not trace_parent: + trace_parent = TraceParent( + constants.TRACE_CONTEXT_VERSION, + "%032x" % random.getrandbits(128), + self.id, + TracingOptions(recorded=is_sampled), + ) + + self.trace_parent: TraceParent = trace_parent self.timestamp = start if start is not None else time.time() - self.name = None - self.result = None + self.name: Optional[str] = None + self.result: Optional[str] = None self.transaction_type = transaction_type - self.tracer = tracer + self._tracer = tracer - self.dropped_spans = 0 - self.context = {} + self.dropped_spans: int = 0 + self.context: Dict[str, Any] = {} self._is_sampled = is_sampled self.sample_rate = sample_rate - self._span_counter = 0 - self._span_timers = defaultdict(Timer) + self._span_counter: int = 0 + self._span_timers: Dict[Tuple[str, str], Timer] = defaultdict(Timer) self._span_timers_lock = threading.Lock() self._dropped_span_statistics = defaultdict(lambda: {"count": 0, "duration.sum.us": 0}) try: @@ -162,7 +203,7 @@ def __init__( self._transaction_metrics = None super(Transaction, self).__init__() - def end(self, skip_frames=0, duration=None): + def end(self, skip_frames: int = 0, duration: Optional[float] = None): super().end(skip_frames, duration) if self._transaction_metrics: self._transaction_metrics.timer( @@ -273,7 +314,7 @@ def begin_span( start=start, ) - def end_span(self, skip_frames=0, duration=None, outcome="unknown"): + def end_span(self, skip_frames: int = 0, duration: Optional[float] = None, outcome: str = "unknown"): """ End the currently active span :param skip_frames: numbers of frames to skip in the stack trace @@ -292,7 +333,7 @@ def end_span(self, skip_frames=0, duration=None, outcome="unknown"): span.end(skip_frames=skip_frames, duration=duration) return span - def ensure_parent_id(self): + def ensure_parent_id(self) -> str: """If current trace_parent has no span_id, generate one, then return it This is used to generate a span ID which the RUM agent will use to correlate @@ -303,7 +344,7 @@ def ensure_parent_id(self): logger.debug("Set parent id to generated %s", self.trace_parent.span_id) return self.trace_parent.span_id - def to_dict(self): + def to_dict(self) -> dict: self.context["tags"] = self.labels result = { "id": self.id, @@ -344,7 +385,7 @@ def track_span_duration(self, span_type, span_subtype, self_duration): self._span_timers[(span_type, span_subtype)].update(self_duration) @property - def is_sampled(self): + def is_sampled(self) -> bool: return self._is_sampled @is_sampled.setter @@ -360,6 +401,10 @@ def is_sampled(self, is_sampled): self.sample_rate = "0" self.trace_parent.add_tracestate(constants.TRACESTATE.SAMPLE_RATE, self.sample_rate) + @property + def tracer(self) -> "Tracer": + return self._tracer + class Span(BaseSpan): __slots__ = ( @@ -371,8 +416,10 @@ class Span(BaseSpan): "action", "context", "leaf", + "dist_tracing_propagated", "timestamp", "start_time", + "ended_time", "duration", "parent", "parent_span_id", @@ -385,18 +432,18 @@ class Span(BaseSpan): def __init__( self, - transaction, - name, - span_type, - context=None, - leaf=False, - labels=None, - parent=None, - parent_span_id=None, - span_subtype=None, - span_action=None, - sync=None, - start=None, + transaction: Transaction, + name: str, + span_type: str, + context: Optional[dict] = None, + leaf: bool = False, + labels: Optional[dict] = None, + parent: Optional["Span"] = None, + parent_span_id: Optional[str] = None, + span_subtype: Optional[str] = None, + span_action: Optional[str] = None, + sync: Optional[bool] = None, + start: Optional[int] = None, ): """ Create a new Span @@ -422,7 +469,6 @@ def __init__( # we take the (non-monotonic) transaction timestamp, and add the (monotonic) difference of span # start time and transaction start time. In this respect, the span timestamp is guaranteed to grow # monotonically with respect to the transaction timestamp - self.duration = None self.parent = parent self.parent_span_id = parent_span_id self.frames = None @@ -430,20 +476,30 @@ def __init__( self.type = span_type self.subtype = span_subtype self.action = span_action + self.dist_tracing_propagated = False + self.composite: Dict[str, Any] = {} super(Span, self).__init__(labels=labels, start=start) self.timestamp = transaction.timestamp + (self.start_time - transaction.start_time) if self.transaction._breakdown: p = self.parent if self.parent else self.transaction p.child_started(self.start_time) - def to_dict(self): + def to_dict(self) -> dict: + if ( + self.composite + and self.composite["compression_strategy"] == "same_kind" + and nested_key(self.context, "destination", "service", "resource") + ): + name = "Calls to " + self.context["destination"]["service"]["resource"] + else: + name = self.name result = { "id": self.id, "transaction_id": self.transaction.id, "trace_id": self.transaction.trace_parent.trace_id, # use either the explicitly set parent_span_id, or the id of the parent, or finally the transaction id "parent_id": self.parent_span_id or (self.parent.id if self.parent else self.transaction.id), - "name": encoding.keyword_field(self.name), + "name": encoding.keyword_field(name), "type": encoding.keyword_field(self.type), "subtype": encoding.keyword_field(self.subtype), "action": encoding.keyword_field(self.action), @@ -463,9 +519,42 @@ def to_dict(self): result["context"] = self.context if self.frames: result["stacktrace"] = self.frames + if self.composite: + result["composite"] = { + "compression_strategy": self.composite["compression_strategy"], + "sum": self.composite["sum"] * 1000, + "count": self.composite["count"], + } return result - def end(self, skip_frames=0, duration=None): + def is_same_kind(self, other_span: SpanType) -> bool: + """ + For compression purposes, two spans are considered to be of the same kind if they have the same + values for type, subtype, and destination.service.resource + :param other_span: another span object + :return: bool + """ + resource = nested_key(self.context, "destination", "service", "resource") + return bool( + self.type == other_span.type + and self.subtype == other_span.subtype + and (resource and resource == nested_key(other_span.context, "destination", "service", "resource")) + ) + + def is_exact_match(self, other_span: SpanType) -> bool: + """ + For compression purposes, two spans are considered to be an exact match if the have the same + name and are of the same kind. + + :param other_span: another span object + :return: bool + """ + return bool(self.name == other_span.name and self.is_same_kind(other_span)) + + def is_compression_eligible(self) -> bool: + return self.leaf and not self.dist_tracing_propagated and self.outcome in (None, constants.OUTCOME.SUCCESS) + + def end(self, skip_frames: int = 0, duration: Optional[float] = None): """ End this span and queue it for sending. @@ -480,13 +569,65 @@ def end(self, skip_frames=0, duration=None): else: self.frames = None execution_context.set_span(self.parent) - tracer.queue_func(SPAN, self.to_dict()) + + p = self.parent if self.parent else self.transaction if self.transaction._breakdown: - p = self.parent if self.parent else self.transaction - p.child_ended(self.start_time + self.duration) + p._child_durations.stop(self.start_time + self.duration) self.transaction.track_span_duration( self.type, self.subtype, self.duration - self._child_durations.duration ) + p.child_ended(self) + + def report(self) -> None: + self.tracer.queue_func(SPAN, self.to_dict()) + + def try_to_compress(self, sibling: SpanType) -> bool: + compression_strategy = ( + self._try_to_compress_composite(sibling) if self.composite else self._try_to_compress_regular(sibling) + ) + if not compression_strategy: + return False + + if not self.composite: + self.composite = {"compression_strategy": compression_strategy, "count": 1, "sum": self.duration} + self.composite["count"] += 1 + self.composite["sum"] += sibling.duration + self.duration = sibling.ended_time - self.start_time + return True + + def _try_to_compress_composite(self, sibling: SpanType) -> Optional[str]: + if self.composite["compression_strategy"] == "exact_match": + return ( + "exact_match" + if ( + self.is_exact_match(sibling) + and sibling.duration <= self.transaction.tracer.config.span_compression_exact_match_max_duration + ) + else None + ) + elif self.composite["compression_strategy"] == "same_kind": + return ( + "same_kind" + if ( + self.is_same_kind(sibling) + and sibling.duration <= self.transaction.tracer.config.span_compression_same_kind_max_duration + ) + else None + ) + return None + + def _try_to_compress_regular(self, sibling: SpanType) -> Optional[str]: + if not self.is_same_kind(sibling): + return None + if self.name == sibling.name: + max_duration = self.transaction.tracer.config.span_compression_exact_match_max_duration + if self.duration <= max_duration and sibling.duration <= max_duration: + return "exact_match" + return None + max_duration = self.transaction.tracer.config.span_compression_same_kind_max_duration + if self.duration <= max_duration and sibling.duration <= max_duration: + return "same_kind" + return None def update_context(self, key, data): """ @@ -502,31 +643,49 @@ def update_context(self, key, data): def __str__(self): return u"{}/{}/{}".format(self.name, self.type, self.subtype) + @property + def tracer(self) -> "Tracer": + return self.transaction.tracer + class DroppedSpan(BaseSpan): - __slots__ = ("leaf", "parent", "id", "context", "outcome") + __slots__ = ("leaf", "parent", "id", "context", "outcome", "dist_tracing_propagated") def __init__(self, parent, leaf=False, start=None, context=None): self.parent = parent self.leaf = leaf self.id = None + self.dist_tracing_propagated = False self.context = context self.outcome = constants.OUTCOME.UNKNOWN super(DroppedSpan, self).__init__(start=start) - def end(self, skip_frames=0, duration=None): + def end(self, skip_frames: int = 0, duration: Optional[float] = None): super().end(skip_frames, duration) execution_context.set_span(self.parent) def child_started(self, timestamp): pass - def child_ended(self, timestamp): + def child_ended(self, child: SpanType): pass def update_context(self, key, data): pass + def report(self): + pass + + def try_to_compress(self, sibling: SpanType) -> bool: + return False + + def is_compression_eligible(self) -> bool: + return False + + @property + def name(self): + return "DroppedSpan" + @property def type(self): return None @@ -587,12 +746,6 @@ def begin_transaction(self, transaction_type, trace_parent=None, start=None): sample_rate=sample_rate, ) if trace_parent is None: - transaction.trace_parent = TraceParent( - constants.TRACE_CONTEXT_VERSION, - "%032x" % random.getrandbits(128), - transaction.id, - TracingOptions(recorded=is_sampled), - ) transaction.trace_parent.add_tracestate(constants.TRACESTATE.SAMPLE_RATE, sample_rate) execution_context.set_transaction(transaction) return transaction @@ -641,17 +794,17 @@ class capture_span(object): def __init__( self, - name=None, - span_type="code.custom", - extra=None, - skip_frames=0, - leaf=False, - labels=None, - span_subtype=None, - span_action=None, - start=None, - duration=None, - sync=None, + name: Optional[str] = None, + span_type: str = "code.custom", + extra: Optional[dict] = None, + skip_frames: int = 0, + leaf: bool = False, + labels: Optional[dict] = None, + span_subtype: Optional[str] = None, + span_action: Optional[str] = None, + start: Optional[int] = None, + duration: Optional[int] = None, + sync: Optional[bool] = None, ): self.name = name if span_subtype is None and "." in span_type: @@ -672,7 +825,7 @@ def __init__( self.duration = duration self.sync = sync - def __call__(self, func): + def __call__(self, func: Callable) -> Callable: self.name = self.name or get_name_from_func(func) @functools.wraps(func) @@ -682,7 +835,7 @@ def decorated(*args, **kwds): return decorated - def __enter__(self): + def __enter__(self) -> Union[Span, DroppedSpan, None]: transaction = execution_context.get_transaction() if transaction and transaction.is_sampled: return transaction.begin_span( @@ -696,6 +849,7 @@ def __enter__(self): start=self.start, sync=self.sync, ) + return None def __exit__(self, exc_type, exc_val, exc_tb): transaction = execution_context.get_transaction() diff --git a/elasticapm/utils/__init__.py b/elasticapm/utils/__init__.py index 804ea57a8..b823a9e5d 100644 --- a/elasticapm/utils/__init__.py +++ b/elasticapm/utils/__init__.py @@ -32,6 +32,8 @@ import os import re from functools import partial +from types import FunctionType +from typing import Pattern from elasticapm.conf import constants from elasticapm.utils import compat, encoding @@ -73,7 +75,7 @@ def varmap(func, var, context=None, name=None, **kwargs): return ret -def get_name_from_func(func): +def get_name_from_func(func: FunctionType) -> str: # partials don't have `__module__` or `__name__`, so we use the values from the "inner" function if isinstance(func, partial_types): return "partial({})".format(get_name_from_func(func.func)) @@ -94,7 +96,7 @@ def build_name_with_http_method_prefix(name, request): return " ".join((request.method, name)) if name else name -def is_master_process(): +def is_master_process() -> bool: # currently only recognizes uwsgi master process try: import uwsgi @@ -104,7 +106,7 @@ def is_master_process(): return False -def get_url_dict(url): +def get_url_dict(url: str) -> dict: parse_result = compat.urlparse.urlparse(url) url_dict = { @@ -123,14 +125,14 @@ def get_url_dict(url): return url_dict -def sanitize_url(url): +def sanitize_url(url: str) -> str: if "@" not in url: return url parts = compat.urlparse.urlparse(url) return url.replace("%s:%s" % (parts.username, parts.password), "%s:%s" % (parts.username, constants.MASK)) -def get_host_from_url(url): +def get_host_from_url(url: str) -> str: parsed_url = compat.urlparse.urlparse(url) host = parsed_url.hostname or " " @@ -140,7 +142,7 @@ def get_host_from_url(url): return host -def url_to_destination(url, service_type="external"): +def url_to_destination(url: str, service_type: str = "external") -> dict: parts = compat.urlparse.urlsplit(url) hostname = parts.hostname if parts.hostname else "" # preserve brackets for IPv6 URLs @@ -163,7 +165,7 @@ def url_to_destination(url, service_type="external"): return {"service": {"name": name, "resource": resource, "type": service_type}} -def read_pem_file(file_obj): +def read_pem_file(file_obj) -> bytes: cert = b"" for line in file_obj: if line.startswith(b"-----BEGIN CERTIFICATE-----"): @@ -176,7 +178,7 @@ def read_pem_file(file_obj): return base64.b64decode(cert) -def starmatch_to_regex(pattern): +def starmatch_to_regex(pattern: str) -> Pattern: options = re.DOTALL # check if we are case sensitive if pattern.startswith("(?-i)"): diff --git a/tests/client/span_compression_tests.py b/tests/client/span_compression_tests.py new file mode 100644 index 000000000..38317bb7e --- /dev/null +++ b/tests/client/span_compression_tests.py @@ -0,0 +1,222 @@ +# BSD 3-Clause License +# +# Copyright (c) 2021, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import pytest + +import elasticapm +from elasticapm.conf.constants import SPAN + + +@pytest.mark.parametrize( + "elasticapm_client", + [{"span_compression_same_kind_max_duration": "5ms", "span_compression_exact_match_max_duration": "5ms"}], + indirect=True, +) +def test_exact_match(elasticapm_client): + transaction = elasticapm_client.begin_transaction("test") + with elasticapm.capture_span( + "test", + span_type="a", + span_subtype="b", + span_action="c", + leaf=True, + duration=2, + extra={"destination": {"service": {"resource": "x"}}}, + ) as span1: + assert span1.is_compression_eligible() + with elasticapm.capture_span( + "test", + span_type="a", + span_subtype="b", + span_action="c", + leaf=True, + duration=3, + extra={"destination": {"service": {"resource": "x"}}}, + ) as span2: + assert span2.is_compression_eligible() + assert span1.is_exact_match(span2) + elasticapm_client.end_transaction("test") + spans = elasticapm_client.events[SPAN] + assert len(spans) == 1 + span = spans[0] + assert "composite" in span + assert span["composite"]["count"] == 2 + assert span["composite"]["sum"] == 5000 + assert span["composite"]["compression_strategy"] == "exact_match" + + +@pytest.mark.parametrize( + "elasticapm_client", + [{"span_compression_same_kind_max_duration": "5ms", "span_compression_exact_match_max_duration": "5ms"}], + indirect=True, +) +def test_same_kind(elasticapm_client): + transaction = elasticapm_client.begin_transaction("test") + with elasticapm.capture_span( + "test1", + span_type="a", + span_subtype="b", + span_action="c", + leaf=True, + duration=2, + extra={"destination": {"service": {"resource": "x"}}}, + ) as span1: + assert span1.is_compression_eligible() + with elasticapm.capture_span( + "test2", + span_type="a", + span_subtype="b", + span_action="c", + leaf=True, + duration=3, + extra={"destination": {"service": {"resource": "x"}}}, + ) as span2: + assert span2.is_compression_eligible() + assert not span1.is_exact_match(span2) + assert span1.is_same_kind(span2) + elasticapm_client.end_transaction("test") + spans = elasticapm_client.events[SPAN] + assert len(spans) == 1 + span = spans[0] + + assert span["name"] == "Calls to x" + assert "composite" in span + assert span["composite"]["count"] == 2 + assert span["composite"]["sum"] == 5000 + assert span["composite"]["compression_strategy"] == "same_kind" + + +@pytest.mark.parametrize( + "elasticapm_client", + [{"span_compression_same_kind_max_duration": "5ms", "span_compression_exact_match_max_duration": "5ms"}], + indirect=True, +) +def test_exact_match_after_same_kind(elasticapm_client): + # if a span that is an exact match is attempted to be compressed with a same_kind composite, it stays same_kind + transaction = elasticapm_client.begin_transaction("test") + with elasticapm.capture_span( + "test1", + span_type="a", + span_subtype="b", + span_action="c", + leaf=True, + duration=2, + extra={"destination": {"service": {"resource": "x"}}}, + ) as span1: + assert span1.is_compression_eligible() + with elasticapm.capture_span( + "test2", + span_type="a", + span_subtype="b", + span_action="c", + leaf=True, + duration=3, + extra={"destination": {"service": {"resource": "x"}}}, + ) as span2: + assert span2.is_compression_eligible() + assert not span1.is_exact_match(span2) + assert span1.is_same_kind(span2) + with elasticapm.capture_span( + "test1", + span_type="a", + span_subtype="b", + span_action="c", + leaf=True, + duration=2, + extra={"destination": {"service": {"resource": "x"}}}, + ) as span3: + assert span3.is_compression_eligible() + elasticapm_client.end_transaction("test") + spans = elasticapm_client.events[SPAN] + assert len(spans) == 1 + span = spans[0] + assert span["composite"]["compression_strategy"] == "same_kind" + assert span["composite"]["count"] == 3 + + +@pytest.mark.parametrize( + "elasticapm_client", + [{"span_compression_same_kind_max_duration": "5ms", "span_compression_exact_match_max_duration": "5ms"}], + indirect=True, +) +def test_nested_spans(elasticapm_client): + transaction = elasticapm_client.begin_transaction("test") + with elasticapm.capture_span("test", "x.y.z") as span1: + with elasticapm.capture_span( + "test1", + span_type="a", + span_subtype="b", + span_action="c", + leaf=True, + duration=2, + extra={"destination": {"service": {"resource": "x"}}}, + ) as span2: + pass + with elasticapm.capture_span( + "test2", + span_type="a", + span_subtype="b", + span_action="c", + leaf=True, + duration=2, + extra={"destination": {"service": {"resource": "x"}}}, + ) as span3: + pass + assert span1.compression_buffer is span2 + assert span2.composite + # assert transaction.compression_buffer is span1 + # assert not span1.compression_buffer + elasticapm_client.end_transaction("test") + spans = elasticapm_client.events[SPAN] + assert len(spans) == 2 + + +@pytest.mark.parametrize( + "elasticapm_client", + [{"span_compression_same_kind_max_duration": "5ms", "span_compression_exact_match_max_duration": "5ms"}], + indirect=True, +) +def test_buffer_is_reported_if_next_child_ineligible(elasticapm_client): + transaction = elasticapm_client.begin_transaction("test") + with elasticapm.capture_span("test", "x.y.z") as span1: + with elasticapm.capture_span( + "test", + "x.y.z", + leaf=True, + duration=2, + extra={"destination": {"service": {"resource": "x"}}}, + ) as span2: + pass + assert span1.compression_buffer is span2 + with elasticapm.capture_span("test", "x.y.z") as span3: + pass + assert span1.compression_buffer is None + elasticapm_client.end_transaction("test") + spans = elasticapm_client.events[SPAN] + assert len(spans) == 3 diff --git a/tests/contrib/django/fixtures.py b/tests/contrib/django/fixtures.py index 776c9970c..f08193be6 100644 --- a/tests/contrib/django/fixtures.py +++ b/tests/contrib/django/fixtures.py @@ -57,6 +57,8 @@ def django_elasticapm_client(request): client_config.setdefault("service_name", "app") client_config.setdefault("secret_token", "secret") client_config.setdefault("span_frames_min_duration", -1) + client_config.setdefault("span_compression_exact_match_max_duration", "0ms") + client_config.setdefault("span_compression_same_kind_max_duration", "0ms") app = apps.get_app_config("elasticapm") old_client = app.client client = TempStoreClient(**client_config) @@ -84,6 +86,8 @@ def django_sending_elasticapm_client(request, validating_httpserver): client_config.setdefault("secret_token", "secret") client_config.setdefault("transport_class", "elasticapm.transport.http.Transport") client_config.setdefault("span_frames_min_duration", -1) + client_config.setdefault("span_compression_exact_match_max_duration", "0ms") + client_config.setdefault("span_compression_same_kind_max_duration", "0ms") app = apps.get_app_config("elasticapm") old_client = app.client client = DjangoClient(**client_config) diff --git a/tests/fixtures.py b/tests/fixtures.py index f0dcde004..03f83adf2 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -189,6 +189,8 @@ def elasticapm_client(request): client_config.setdefault("span_frames_min_duration", -1) client_config.setdefault("metrics_interval", "0ms") client_config.setdefault("cloud_provider", False) + client_config.setdefault("span_compression_exact_match_max_duration", "0ms") + client_config.setdefault("span_compression_same_kind_max_duration", "0ms") client = TempStoreClient(**client_config) yield client client.close() @@ -208,6 +210,8 @@ def elasticapm_client_log_file(request): client_config.setdefault("central_config", "false") client_config.setdefault("include_paths", ("*/tests/*",)) client_config.setdefault("span_frames_min_duration", -1) + client_config.setdefault("span_compression_exact_match_max_duration", "0ms") + client_config.setdefault("span_compression_same_kind_max_duration", "0ms") client_config.setdefault("metrics_interval", "0ms") client_config.setdefault("cloud_provider", False) client_config.setdefault("log_level", "warning") @@ -289,6 +293,8 @@ def sending_elasticapm_client(request, validating_httpserver): client_config.setdefault("secret_token", "test_key") client_config.setdefault("transport_class", "elasticapm.transport.http.Transport") client_config.setdefault("span_frames_min_duration", -1) + client_config.setdefault("span_compression_exact_match_max_duration", "0ms") + client_config.setdefault("span_compression_same_kind_max_duration", "0ms") client_config.setdefault("include_paths", ("*/tests/*",)) client_config.setdefault("metrics_interval", "0ms") client_config.setdefault("central_config", "false") diff --git a/tests/metrics/breakdown_tests.py b/tests/metrics/breakdown_tests.py index f29a9de98..a9062854c 100644 --- a/tests/metrics/breakdown_tests.py +++ b/tests/metrics/breakdown_tests.py @@ -172,15 +172,14 @@ def test_disable_breakdowns(elasticapm_client): "elasticapm.metrics.sets.transactions.TransactionsMetricSet" ) with mock.patch("elasticapm.traces.BaseSpan.child_started") as mock_child_started, mock.patch( - "elasticapm.traces.BaseSpan.child_ended" - ) as mock_child_ended, mock.patch("elasticapm.traces.Transaction.track_span_duration") as mock_track_span_duration: + "elasticapm.traces.Transaction.track_span_duration" + ) as mock_track_span_duration: transaction = elasticapm_client.begin_transaction("test") assert transaction._breakdown is None with elasticapm.capture_span("test", span_type="template", span_subtype="django", duration=5): pass elasticapm_client.end_transaction("test", "OK", duration=5) assert mock_child_started.call_count == 0 - assert mock_child_ended.call_count == 0 assert mock_track_span_duration.call_count == 0 # transaction duration should still be captured data = list(transaction_metrics.collect())