Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of empty metric collection cycles #3335

Merged
merged 14 commits into from
Sep 7, 2023
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Fix handling of empty metric collection cycles
([#3335](https://github.com/open-telemetry/opentelemetry-python/pull/3335))
- Fix error when no LoggerProvider configured for LoggingHandler
([#3423](https://github.com/open-telemetry/opentelemetry-python/pull/3423))



## Version 1.20.0/0.41b0 (2023-09-04)

- Modify Prometheus exporter to translate non-monotonic Sums into Gauges
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from logging import getLogger
from threading import Lock
from time import time_ns
from typing import Dict, List, Sequence
from typing import Dict, List, Optional, Sequence

from opentelemetry.metrics import Instrument
from opentelemetry.sdk.metrics._internal.aggregation import (
Expand Down Expand Up @@ -126,7 +126,7 @@ def collect(
self,
aggregation_temporality: AggregationTemporality,
collection_start_nanos: int,
) -> Sequence[DataPointT]:
) -> Optional[Sequence[DataPointT]]:
ocelotl marked this conversation as resolved.
Show resolved Hide resolved

data_points: List[DataPointT] = []
with self._lock:
Expand All @@ -136,4 +136,8 @@ def collect(
)
if data_point is not None:
data_points.append(data_point)
return data_points

# Returning here None instead of an empty list because the caller
# does not consume a sequence and to be consistent with the rest of
# collect methods that also return None.
return data_points or None
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,14 @@ def collect(self, timeout_millis: float = 10_000) -> None:
)
return

self._receive_metrics(
self._collect(self, timeout_millis=timeout_millis),
timeout_millis=timeout_millis,
)
metrics = self._collect(self, timeout_millis=timeout_millis)

if metrics is not None:
ocelotl marked this conversation as resolved.
Show resolved Hide resolved

self._receive_metrics(
metrics,
timeout_millis=timeout_millis,
)

@final
def _set_collect_callback(
Expand Down Expand Up @@ -515,8 +519,7 @@ def _receive_metrics(
timeout_millis: float = 10_000,
**kwargs,
) -> None:
if metrics_data is None:
return

token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
with self._export_lock:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from abc import ABC, abstractmethod
from threading import Lock
from time import time_ns
from typing import Iterable, List, Mapping
from typing import Iterable, List, Mapping, Optional

# This kind of import is needed to avoid Sphinx errors.
import opentelemetry.sdk.metrics
Expand Down Expand Up @@ -51,7 +51,7 @@ def collect(
self,
metric_reader: "opentelemetry.sdk.metrics.MetricReader",
timeout_millis: float = 10_000,
) -> Iterable[Metric]:
) -> Optional[Iterable[Metric]]:
pass


Expand Down Expand Up @@ -94,7 +94,7 @@ def collect(
self,
metric_reader: "opentelemetry.sdk.metrics.MetricReader",
timeout_millis: float = 10_000,
) -> Iterable[Metric]:
) -> Optional[Iterable[Metric]]:

with self._lock:
metric_reader_storage = self._reader_storages[metric_reader]
Expand Down Expand Up @@ -123,4 +123,6 @@ def collect(
for measurement in measurements:
metric_reader_storage.consume_measurement(measurement)

return self._reader_storages[metric_reader].collect()
result = self._reader_storages[metric_reader].collect()

return result
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from logging import getLogger
from threading import RLock
from time import time_ns
from typing import Dict, List
from typing import Dict, List, Optional

from opentelemetry.metrics import (
Asynchronous,
Expand Down Expand Up @@ -119,7 +119,7 @@ def consume_measurement(self, measurement: Measurement) -> None:
):
view_instrument_match.consume_measurement(measurement)

def collect(self) -> MetricsData:
def collect(self) -> Optional[MetricsData]:
# Use a list instead of yielding to prevent a slow reader from holding
# SDK locks

Expand Down Expand Up @@ -152,16 +152,21 @@ def collect(self) -> MetricsData:

for view_instrument_match in view_instrument_matches:

data_points = view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
)

if data_points is None:
continue
ocelotl marked this conversation as resolved.
Show resolved Hide resolved

if isinstance(
# pylint: disable=protected-access
view_instrument_match._aggregation,
_SumAggregation,
):
data = Sum(
aggregation_temporality=aggregation_temporality,
data_points=view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
),
data_points=data_points,
is_monotonic=isinstance(
instrument, (Counter, ObservableCounter)
),
Expand All @@ -171,20 +176,14 @@ def collect(self) -> MetricsData:
view_instrument_match._aggregation,
_LastValueAggregation,
):
data = Gauge(
data_points=view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
)
)
data = Gauge(data_points=data_points)
elif isinstance(
# pylint: disable=protected-access
view_instrument_match._aggregation,
_ExplicitBucketHistogramAggregation,
):
data = Histogram(
data_points=view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
),
data_points=data_points,
aggregation_temporality=aggregation_temporality,
)
elif isinstance(
Expand All @@ -200,9 +199,7 @@ def collect(self) -> MetricsData:
_ExponentialBucketHistogramAggregation,
):
data = ExponentialHistogram(
data_points=view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
),
data_points=data_points,
aggregation_temporality=aggregation_temporality,
)

Expand All @@ -216,32 +213,38 @@ def collect(self) -> MetricsData:
)
)

if instrument.instrumentation_scope not in (
instrumentation_scope_scope_metrics
):
instrumentation_scope_scope_metrics[
instrument.instrumentation_scope
] = ScopeMetrics(
scope=instrument.instrumentation_scope,
metrics=metrics,
schema_url=instrument.instrumentation_scope.schema_url,
)
else:
instrumentation_scope_scope_metrics[
instrument.instrumentation_scope
].metrics.extend(metrics)

return MetricsData(
resource_metrics=[
ResourceMetrics(
resource=self._sdk_config.resource,
scope_metrics=list(
instrumentation_scope_scope_metrics.values()
),
schema_url=self._sdk_config.resource.schema_url,
if metrics:

if instrument.instrumentation_scope not in (
instrumentation_scope_scope_metrics
):
instrumentation_scope_scope_metrics[
instrument.instrumentation_scope
] = ScopeMetrics(
scope=instrument.instrumentation_scope,
metrics=metrics,
schema_url=instrument.instrumentation_scope.schema_url,
)
else:
instrumentation_scope_scope_metrics[
instrument.instrumentation_scope
].metrics.extend(metrics)

if instrumentation_scope_scope_metrics:

return MetricsData(
resource_metrics=[
ResourceMetrics(
resource=self._sdk_config.resource,
scope_metrics=list(
instrumentation_scope_scope_metrics.values()
),
schema_url=self._sdk_config.resource.schema_url,
)
]
)
]
)

return None

def _handle_view_instrument_match(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,19 @@ def test_console_exporter(self):

self.assertEqual(metrics["attributes"], {"a": "b"})
self.assertEqual(metrics["value"], 1)

def test_console_exporter_no_export(self):

output = StringIO()
exporter = ConsoleMetricExporter(out=output)
reader = PeriodicExportingMetricReader(
exporter, export_interval_millis=100
)
provider = MeterProvider(metric_readers=[reader])
provider.shutdown()

output.seek(0)
actual = "".join(output.readlines())
expected = ""

self.assertEqual(actual, expected)
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,7 @@ def test_disable_default_views(self):
counter.add(10, {"label": "value1"})
counter.add(10, {"label": "value2"})
counter.add(10, {"label": "value3"})
self.assertEqual(
(
reader.get_metrics_data()
.resource_metrics[0]
.scope_metrics[0]
.metrics
),
[],
)
self.assertIsNone(reader.get_metrics_data())

def test_disable_default_views_add_custom(self):
reader = InMemoryMetricReader()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ class TestExporterConcurrency(ConcurrencyTestBase):
> be called again only after the current call returns.

https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exportbatch

This test also tests that a thread that calls the a
``MetricReader.collect`` method using an asynchronous instrument is able
to perform two actions in the same thread lock space (without it being
interrupted by another thread):

1. Consume the measurement produced by the callback associated to the
asynchronous instrument.
2. Export the measurement mentioned in the step above.
"""

def test_exporter_not_called_concurrently(self):
Expand All @@ -84,7 +93,11 @@ def test_exporter_not_called_concurrently(self):
)
meter_provider = MeterProvider(metric_readers=[reader])

counter_cb_counter = 0

def counter_cb(options: CallbackOptions):
nonlocal counter_cb_counter
counter_cb_counter += 1
yield Observation(2)

meter_provider.get_meter(__name__).create_observable_counter(
Expand All @@ -97,6 +110,7 @@ def test_many_threads():

self.run_with_many_threads(test_many_threads, num_threads=100)

self.assertEqual(counter_cb_counter, 100)
# no thread should be in export() now
self.assertEqual(exporter.count_in_export, 0)
# should be one call for each thread
Expand Down
Loading
Loading