Skip to content

Commit

Permalink
Add tests for periodic reader from various RT combinations (#2147)
Browse files Browse the repository at this point in the history
These tests should help with testing #2142
  • Loading branch information
cijothomas authored Sep 25, 2024
1 parent c8136d9 commit 88023d9
Showing 1 changed file with 60 additions and 21 deletions.
81 changes: 60 additions & 21 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,29 +381,41 @@ mod tests {
use std::sync::mpsc;

#[test]
fn collection_triggered_by_interval() {
// Arrange
let interval = std::time::Duration::from_millis(1);
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::TokioCurrentThread)
.with_interval(interval)
.build();
let (sender, receiver) = mpsc::channel();
fn collection_triggered_by_interval_tokio_current() {
collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
}

// Act
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");
let _counter = meter
.u64_observable_counter("testcounter")
.with_callback(move |_| {
sender.send(()).expect("channel should still be open");
})
.init();
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio() {
collection_triggered_by_interval_helper(runtime::Tokio);
}

// Assert
receiver
.recv()
.expect("message should be available in channel, indicating a collection occurred");
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio() {
collection_triggered_by_interval_helper(runtime::Tokio);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio_current()
{
collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio_current()
{
collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
}

#[tokio::test(flavor = "current_thread")]
#[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/2056"]
async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio() {
collection_triggered_by_interval_helper(runtime::Tokio);
}

#[tokio::test(flavor = "current_thread")]
async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio_current() {
collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
}

#[test]
Expand All @@ -424,4 +436,31 @@ mod tests {
matches!(result.unwrap_err(), MetricsError::Other(err) if err == "reader is not registered")
);
}

fn collection_triggered_by_interval_helper<RT>(runtime: RT)
where
RT: crate::runtime::Runtime,
{
let interval = std::time::Duration::from_millis(1);
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime)
.with_interval(interval)
.build();
let (sender, receiver) = mpsc::channel();

// Act
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");
let _counter = meter
.u64_observable_counter("testcounter")
.with_callback(move |_| {
sender.send(()).expect("channel should still be open");
})
.init();

// Assert
receiver
.recv()
.expect("message should be available in channel, indicating a collection occurred");
}
}

0 comments on commit 88023d9

Please sign in to comment.