diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index c07200b274..f6aafeb06b 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -381,29 +381,41 @@ mod tests { use std::sync::mpsc; #[test] - fn collection_triggered_by_interval() { - // Arrange - let interval = std::time::Duration::from_millis(1); - let exporter = InMemoryMetricsExporter::default(); - let reader = PeriodicReader::builder(exporter.clone(), runtime::TokioCurrentThread) - .with_interval(interval) - .build(); - let (sender, receiver) = mpsc::channel(); + fn collection_triggered_by_interval_tokio_current() { + collection_triggered_by_interval_helper(runtime::TokioCurrentThread); + } - // Act - let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); - let meter = meter_provider.meter("test"); - let _counter = meter - .u64_observable_counter("testcounter") - .with_callback(move |_| { - sender.send(()).expect("channel should still be open"); - }) - .init(); + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio() { + collection_triggered_by_interval_helper(runtime::Tokio); + } - // Assert - receiver - .recv() - .expect("message should be available in channel, indicating a collection occurred"); + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio() { + collection_triggered_by_interval_helper(runtime::Tokio); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio_current() + { + collection_triggered_by_interval_helper(runtime::TokioCurrentThread); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio_current() + { + collection_triggered_by_interval_helper(runtime::TokioCurrentThread); + } + + #[tokio::test(flavor = "current_thread")] + #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/2056"] + async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio() { + collection_triggered_by_interval_helper(runtime::Tokio); + } + + #[tokio::test(flavor = "current_thread")] + async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio_current() { + collection_triggered_by_interval_helper(runtime::TokioCurrentThread); } #[test] @@ -424,4 +436,31 @@ mod tests { matches!(result.unwrap_err(), MetricsError::Other(err) if err == "reader is not registered") ); } + + fn collection_triggered_by_interval_helper(runtime: RT) + where + RT: crate::runtime::Runtime, + { + let interval = std::time::Duration::from_millis(1); + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone(), runtime) + .with_interval(interval) + .build(); + let (sender, receiver) = mpsc::channel(); + + // Act + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + let meter = meter_provider.meter("test"); + let _counter = meter + .u64_observable_counter("testcounter") + .with_callback(move |_| { + sender.send(()).expect("channel should still be open"); + }) + .init(); + + // Assert + receiver + .recv() + .expect("message should be available in channel, indicating a collection occurred"); + } }