Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ocelotl committed Aug 17, 2023
1 parent 1ea9c69 commit 5e795f5
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 69 deletions.
1 change: 1 addition & 0 deletions opentelemetry-sdk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies = [
"opentelemetry-api == 1.20.0.dev",
"opentelemetry-semantic-conventions == 0.41b0.dev",
"typing-extensions >= 3.7.4",
"ipdb"
]

[project.optional-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from logging import getLogger
from threading import Lock
from time import time_ns
from typing import Dict, List, Sequence
from typing import Dict, List, Sequence, Optional

from opentelemetry.metrics import Instrument
from opentelemetry.sdk.metrics._internal.aggregation import (
Expand Down Expand Up @@ -126,7 +126,7 @@ def collect(
self,
aggregation_temporality: AggregationTemporality,
collection_start_nanos: int,
) -> Sequence[DataPointT]:
) -> Optional[Sequence[DataPointT]]:

data_points: List[DataPointT] = []
with self._lock:
Expand All @@ -136,4 +136,6 @@ def collect(
)
if data_point is not None:
data_points.append(data_point)
return data_points
if data_points:
return data_points
return None
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,14 @@ def collect(self, timeout_millis: float = 10_000) -> None:
)
return

self._receive_metrics(
self._collect(self, timeout_millis=timeout_millis),
timeout_millis=timeout_millis,
)
metrics = self._collect(self, timeout_millis=timeout_millis)

if metrics is not None:

self._receive_metrics(
metrics,
timeout_millis=timeout_millis,
)

@final
def _set_collect_callback(
Expand Down Expand Up @@ -515,8 +519,7 @@ def _receive_metrics(
timeout_millis: float = 10_000,
**kwargs,
) -> None:
if metrics_data is None:
return

token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
with self._export_lock:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from abc import ABC, abstractmethod
from threading import Lock
from time import time_ns
from typing import Iterable, List, Mapping
from typing import Iterable, List, Mapping, Optional

# This kind of import is needed to avoid Sphinx errors.
import opentelemetry.sdk.metrics
Expand Down Expand Up @@ -94,7 +94,7 @@ def collect(
self,
metric_reader: "opentelemetry.sdk.metrics.MetricReader",
timeout_millis: float = 10_000,
) -> Iterable[Metric]:
) -> Optional[Iterable[Metric]]:

with self._lock:
metric_reader_storage = self._reader_storages[metric_reader]
Expand Down Expand Up @@ -123,4 +123,6 @@ def collect(
for measurement in measurements:
metric_reader_storage.consume_measurement(measurement)

return self._reader_storages[metric_reader].collect()
result = self._reader_storages[metric_reader].collect()

return result
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,21 @@ def collect(self) -> MetricsData:

for view_instrument_match in view_instrument_matches:

data_points = view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
)

if data_points is None:
continue

if isinstance(
# pylint: disable=protected-access
view_instrument_match._aggregation,
_SumAggregation,
):
data = Sum(
aggregation_temporality=aggregation_temporality,
data_points=view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
),
data_points=data_points,
is_monotonic=isinstance(
instrument, (Counter, ObservableCounter)
),
Expand All @@ -171,20 +176,14 @@ def collect(self) -> MetricsData:
view_instrument_match._aggregation,
_LastValueAggregation,
):
data = Gauge(
data_points=view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
)
)
data = Gauge(data_points=data_points)
elif isinstance(
# pylint: disable=protected-access
view_instrument_match._aggregation,
_ExplicitBucketHistogramAggregation,
):
data = Histogram(
data_points=view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
),
data_points=data_points,
aggregation_temporality=aggregation_temporality,
)
elif isinstance(
Expand All @@ -200,9 +199,7 @@ def collect(self) -> MetricsData:
_ExponentialBucketHistogramAggregation,
):
data = ExponentialHistogram(
data_points=view_instrument_match.collect(
aggregation_temporality, collection_start_nanos
),
data_points=data_points,
aggregation_temporality=aggregation_temporality,
)

Expand All @@ -216,32 +213,38 @@ def collect(self) -> MetricsData:
)
)

if instrument.instrumentation_scope not in (
instrumentation_scope_scope_metrics
):
instrumentation_scope_scope_metrics[
instrument.instrumentation_scope
] = ScopeMetrics(
scope=instrument.instrumentation_scope,
metrics=metrics,
schema_url=instrument.instrumentation_scope.schema_url,
if metrics:

if instrument.instrumentation_scope not in (
instrumentation_scope_scope_metrics
):
instrumentation_scope_scope_metrics[
instrument.instrumentation_scope
] = ScopeMetrics(
scope=instrument.instrumentation_scope,
metrics=metrics,
schema_url=instrument.instrumentation_scope.schema_url,
)
else:
instrumentation_scope_scope_metrics[
instrument.instrumentation_scope
].metrics.extend(metrics)

if instrumentation_scope_scope_metrics:

return MetricsData(
resource_metrics=[
ResourceMetrics(
resource=self._sdk_config.resource,
scope_metrics=list(
instrumentation_scope_scope_metrics.values()
),
schema_url=self._sdk_config.resource.schema_url,
)
else:
instrumentation_scope_scope_metrics[
instrument.instrumentation_scope
].metrics.extend(metrics)

return MetricsData(
resource_metrics=[
ResourceMetrics(
resource=self._sdk_config.resource,
scope_metrics=list(
instrumentation_scope_scope_metrics.values()
),
schema_url=self._sdk_config.resource.schema_url,
)
]
)
]
)

return None

def _handle_view_instrument_match(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,19 @@ def test_console_exporter(self):

self.assertEqual(metrics["attributes"], {"a": "b"})
self.assertEqual(metrics["value"], 1)

def test_console_exporter_no_export(self):

output = StringIO()
exporter = ConsoleMetricExporter(out=output)
reader = PeriodicExportingMetricReader(
exporter, export_interval_millis=100
)
provider = MeterProvider(metric_readers=[reader])
provider.shutdown()

output.seek(0)
actual = "".join(output.readlines())
expected = ""

self.assertEqual(actual, expected)
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,7 @@ def test_disable_default_views(self):
counter.add(10, {"label": "value1"})
counter.add(10, {"label": "value2"})
counter.add(10, {"label": "value3"})
self.assertEqual(
(
reader.get_metrics_data()
.resource_metrics[0]
.scope_metrics[0]
.metrics
),
[],
)
self.assertIsNone(reader.get_metrics_data())

def test_disable_default_views_add_custom(self):
reader = InMemoryMetricReader()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ class TestExporterConcurrency(ConcurrencyTestBase):
> be called again only after the current call returns.
https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exportbatch
This test also tests that a thread that calls the a
``MetricReader.collect`` method using an asynchronous instrument is able
to perform two actions in the same thread lock space (without it being
interrupted by another thread):
1. Consume the measurement produced by the callback associated to the
asynchronous instrument.
2. Export the measurement mentioned in the step above.
"""

def test_exporter_not_called_concurrently(self):
Expand All @@ -84,7 +93,11 @@ def test_exporter_not_called_concurrently(self):
)
meter_provider = MeterProvider(metric_readers=[reader])

counter_cb_counter = 0

def counter_cb(options: CallbackOptions):
nonlocal counter_cb_counter
counter_cb_counter += 1
yield Observation(2)

meter_provider.get_meter(__name__).create_observable_counter(
Expand All @@ -97,6 +110,7 @@ def test_many_threads():

self.run_with_many_threads(test_many_threads, num_threads=100)

self.assertEqual(counter_cb_counter, 100)
# no thread should be in export() now
self.assertEqual(exporter.count_in_export, 0)
# should be one call for each thread
Expand Down
10 changes: 1 addition & 9 deletions opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,15 +314,7 @@ def test_drop_aggregation(self):
)
metric_reader_storage.consume_measurement(Measurement(1, counter))

self.assertEqual(
[],
(
metric_reader_storage.collect()
.resource_metrics[0]
.scope_metrics[0]
.metrics
),
)
self.assertIsNone(metric_reader_storage.collect())

def test_same_collection_start(self):

Expand Down
19 changes: 19 additions & 0 deletions opentelemetry-sdk/tests/metrics/test_no_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import (
ConsoleMetricExporter,
PeriodicExportingMetricReader,
)
from unittest import TestCase
from time import sleep


class TestNoData(TestCase):

def test_no_data(self):
reader = PeriodicExportingMetricReader(
ConsoleMetricExporter(),
export_interval_millis=10
)
provider = MeterProvider(metric_readers=[reader])
provider
sleep(1)

0 comments on commit 5e795f5

Please sign in to comment.