diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 05f8e0e223..e09cccb8c1 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -421,32 +421,41 @@ mod tests { // cargo test metrics::periodic_reader::tests --features=testing -- --nocapture #[test] - fn collection_triggered_by_interval() { - // Arrange - let interval = std::time::Duration::from_millis(1000); - let exporter = InMemoryMetricsExporter::default(); - let reader = PeriodicReader::builder(exporter.clone()) - .with_interval(interval) - .build(); - let (sender, receiver) = mpsc::channel(); + fn collection_triggered_by_interval_tokio_current() { + collection_triggered_by_interval_helper(runtime::TokioCurrentThread); + } - // Act - let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); - let meter = meter_provider.meter("test"); - let _counter = meter - .u64_observable_counter("testcounter") - .with_callback(move |_| { - sender.send(()).expect("channel should still be open"); - }) - .init(); + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio() { + collection_triggered_by_interval_helper(runtime::Tokio); + } - // Sleep for a duration longer than the interval to ensure at least one collection - std::thread::sleep(interval * 2); + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio() { + collection_triggered_by_interval_helper(runtime::Tokio); + } - // Assert - receiver - .recv_timeout(Duration::ZERO) - .expect("message should be available in channel, indicating a collection occurred"); + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio_current() + { + collection_triggered_by_interval_helper(runtime::TokioCurrentThread); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio_current() + { + collection_triggered_by_interval_helper(runtime::TokioCurrentThread); + } + + #[tokio::test(flavor = "current_thread")] + #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/2056"] + async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio() { + collection_triggered_by_interval_helper(runtime::Tokio); + } + + #[tokio::test(flavor = "current_thread")] + async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio_current() { + collection_triggered_by_interval_helper(runtime::TokioCurrentThread); } #[test] @@ -551,25 +560,40 @@ mod tests { resource: Resource::empty(), scope_metrics: Vec::new(), }; - // Pipeline is not registered, so collect should return an error - let result = reader.collect(rm); - assert!(result.is_err()); - // Pipeline is not registered, so flush should return an error - let result = reader.force_flush(); - assert!(result.is_err()); + // Act + let result = reader.collect(&mut rm); + + // Assert + assert!( + matches!(result.unwrap_err(), MetricsError::Other(err) if err == "reader is not registered") + ); + } - // Adding reader to meter provider should register the pipeline - // TODO: This part might benefit from a different design. - let meter_provider = SdkMeterProvider::builder() - .with_reader(reader.clone()) + fn collection_triggered_by_interval_helper(runtime: RT) + where + RT: crate::runtime::Runtime, + { + let interval = std::time::Duration::from_millis(1); + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone(), runtime) + .with_interval(interval) .build(); + let (sender, receiver) = mpsc::channel(); - // Now collect and flush should succeed - let result = reader.collect(rm); - assert!(result.is_ok()); + // Act + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + let meter = meter_provider.meter("test"); + let _counter = meter + .u64_observable_counter("testcounter") + .with_callback(move |_| { + sender.send(()).expect("channel should still be open"); + }) + .init(); - let result = meter_provider.force_flush(); - assert!(result.is_ok()); + // Assert + receiver + .recv() + .expect("message should be available in channel, indicating a collection occurred"); } }