diff --git a/rust/otap-dataflow/crates/config/src/pipeline/telemetry.rs b/rust/otap-dataflow/crates/config/src/pipeline/telemetry.rs index 7dbe3c0d37..b448045952 100644 --- a/rust/otap-dataflow/crates/config/src/pipeline/telemetry.rs +++ b/rust/otap-dataflow/crates/config/src/pipeline/telemetry.rs @@ -36,6 +36,11 @@ pub struct TelemetryConfig { impl TelemetryConfig { /// Validates the telemetry configuration, including all metric readers. pub fn validate(&self) -> Result<(), crate::error::Error> { + if self.reporting_interval.is_zero() { + return Err(crate::error::Error::InvalidUserConfig { + error: "engine.telemetry.reporting_interval must be greater than zero".to_string(), + }); + } self.metrics.validate() } } @@ -418,6 +423,19 @@ mod tests { assert_eq!(config.metrics.readers.len(), 0); } + #[test] + fn test_zero_reporting_interval_rejected() { + let config = TelemetryConfig { + reporting_interval: Duration::ZERO, + ..Default::default() + }; + let err = config.validate().unwrap_err(); + assert!( + err.to_string().contains("greater than zero"), + "expected zero-interval error, got: {err}" + ); + } + #[test] fn test_attribute_value_deserialize_yaml() { let yaml_str = r#" diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/exporter.rs b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/exporter.rs index f8db79fa52..94845cacd8 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/exporter.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/exporter.rs @@ -486,14 +486,6 @@ impl Exporter for AzureMonitorExporter { } })?; - // Start periodic telemetry collection and retain the cancel handle for graceful shutdown - let telemetry_timer_cancel_handle = effect_handler - .start_periodic_telemetry(std::time::Duration::from_secs(1)) - .await - .map_err(|e| EngineError::InternalError { - message: format!("Failed to start telemetry timer: {e}"), - })?; - let mut next_periodic_export = tokio::time::Instant::now() + tokio::time::Duration::from_secs(PERIODIC_EXPORT_INTERVAL); let mut next_heartbeat_send = tokio::time::Instant::now(); @@ -639,7 +631,6 @@ impl Exporter for AzureMonitorExporter { let _ = self.metrics.borrow_mut().report(&mut metrics_reporter); } Ok(Message::Control(NodeControlMsg::Shutdown { deadline, .. })) => { - let _ = telemetry_timer_cancel_handle.cancel().await; self.handle_shutdown(&effect_handler).await?; let snapshot = self.metrics.borrow().metrics().snapshot(); return Ok(TerminalState::new( diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/geneva_exporter/mod.rs b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/geneva_exporter/mod.rs index 7a693f9842..bfa929ca44 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/geneva_exporter/mod.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/geneva_exporter/mod.rs @@ -52,7 +52,7 @@ use otap_df_telemetry_macros::metric_set; use serde::Deserialize; use std::path::PathBuf; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; // Geneva uploader dependencies use futures::StreamExt; @@ -709,12 +709,6 @@ impl Exporter for GenevaExporter { message = "Geneva exporter starting" ); - // Start periodic telemetry collection so CollectTelemetry messages - // are delivered to this exporter's message channel. - let timer_cancel_handle = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; - // Message loop loop { match msg_chan.recv().await? { @@ -724,7 +718,6 @@ impl Exporter for GenevaExporter { message = "Geneva exporter shutting down" ); - _ = timer_cancel_handle.cancel().await; return Ok(TerminalState::new( deadline, [self.pdata_metrics.snapshot(), self.metrics.snapshot()], diff --git a/rust/otap-dataflow/crates/core-nodes/src/exporters/otap_exporter/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/exporters/otap_exporter/mod.rs index 1abed86402..1ff7d8620d 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/exporters/otap_exporter/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/exporters/otap_exporter/mod.rs @@ -388,10 +388,6 @@ impl local::Exporter for OTAPExporter { } })?; - let timer_cancel_handle = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; - // start a grpc client and connect to the server let mut arrow_metrics_client = ArrowMetricsServiceClient::new(channel.clone()); let mut arrow_logs_client = ArrowLogsServiceClient::new(channel.clone()); @@ -498,7 +494,6 @@ impl local::Exporter for OTAPExporter { &effect_handler, ) .await?; - _ = timer_cancel_handle.cancel().await; self.export_latency_window .report_into(&mut self.async_metrics); return Ok(TerminalState::new( diff --git a/rust/otap-dataflow/crates/core-nodes/src/exporters/otlp_grpc_exporter/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/exporters/otlp_grpc_exporter/mod.rs index b21f171f24..b0e06bc16b 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/exporters/otlp_grpc_exporter/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/exporters/otlp_grpc_exporter/mod.rs @@ -46,7 +46,6 @@ use otap_df_telemetry::{otel_debug, otel_info, otel_warn}; use serde::Deserialize; use std::future::Future; use std::sync::Arc; -use std::time::Duration; use tonic::codec::CompressionEncoding; use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue}; use tonic::transport::Channel; @@ -132,9 +131,6 @@ impl Exporter for OTLPExporter { self.config.grpc.log_proxy_info(); let exporter_id = effect_handler.exporter_id(); - let timer_cancel_handle = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; let channel = self .config @@ -246,7 +242,6 @@ impl Exporter for OTLPExporter { grpc_clients.release(client); } } - _ = timer_cancel_handle.cancel().await; return Ok(TerminalState::new(deadline, [self.pdata_metrics])); } Message::Control(NodeControlMsg::CollectTelemetry { diff --git a/rust/otap-dataflow/crates/core-nodes/src/exporters/otlp_http_exporter/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/exporters/otlp_http_exporter/mod.rs index 5c031fbe5a..4905269b1a 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/exporters/otlp_http_exporter/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/exporters/otlp_http_exporter/mod.rs @@ -16,7 +16,6 @@ use std::num::NonZeroUsize; use std::rc::Rc; use std::sync::Arc; -use std::time::Duration; use async_trait::async_trait; use bytes::{Bytes, BytesMut}; @@ -227,10 +226,6 @@ impl Exporter for OtlpHttpExporter { traces_endpoint = traces_endpoint.as_str(), ); - let telemetry_timer_cancel = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; - let max_in_flight = self.config.max_in_flight.max(1); let mut client_pool = HttpClientPool::try_new(&self.config.http, self.config.client_pool_size) @@ -309,7 +304,6 @@ impl Exporter for OtlpHttpExporter { .await; } } - _ = telemetry_timer_cancel.cancel().await; return Ok(TerminalState::new(deadline, [self.pdata_metrics])); } Message::Control(NodeControlMsg::CollectTelemetry { diff --git a/rust/otap-dataflow/crates/core-nodes/src/exporters/parquet_exporter/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/exporters/parquet_exporter/mod.rs index 73f26af615..d8ae5b8e9f 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/exporters/parquet_exporter/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/exporters/parquet_exporter/mod.rs @@ -189,11 +189,6 @@ impl Exporter for ParquetExporter { .await?; } - // Start periodic telemetry collection (internal metrics) - let telemetry_cancel_handle = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; - let mut writer = writer::WriterManager::new(object_store, writer_options); let mut batch_id = 0; let mut id_generator = PartitionSequenceIdGenerator::new(); @@ -257,7 +252,6 @@ impl Exporter for ParquetExporter { // granularity is ~15 ms), so an explicit check avoids a // race between the timeout and the flush future. if deadline.checked_duration_since(Instant::now()).is_none() { - let _ = telemetry_cancel_handle.cancel().await; return Err(Error::IoError { node: exporter_id.clone(), error: std::io::Error::from(ErrorKind::TimedOut), @@ -268,15 +262,7 @@ impl Exporter for ParquetExporter { let flush_all = writer.flush_all().fuse(); pin_mut!(flush_all); // Stop telemetry loop concurrently with flushing; do not block shutdown on cancel - let cancel_fut = async { - let _ = telemetry_cancel_handle.cancel().await; - futures::future::pending::<()>().await - } - .fuse(); - pin_mut!(cancel_fut); - return futures::select_biased! { - _ = cancel_fut => unreachable!(), _timeout = timeout => Err(Error::IoError { node: exporter_id.clone(), error: std::io::Error::from(ErrorKind::TimedOut) @@ -1217,92 +1203,6 @@ mod test { exporter_result.unwrap(); } - #[test] - fn test_starts_telemetry_timer() { - use otap_df_engine::control::runtime_ctrl_msg_channel; - use otap_df_engine::testing::test_node; - - let test_runtime = TestRuntime::::new(); - let temp_dir = tempfile::tempdir().unwrap(); - let base_dir: String = temp_dir.path().to_str().unwrap().into(); - let exporter = ParquetExporter::new(config::Config { - storage: object_store::StorageType::File { - base_uri: base_dir.clone(), - }, - partitioning_strategies: None, - writer_options: None, - }); - let node_config = Arc::new(NodeUserConfig::new_exporter_config(PARQUET_EXPORTER_URN)); - let mut exporter = ExporterWrapper::::local::( - exporter, - test_node(test_runtime.config().name.clone()), - node_config, - test_runtime.config(), - ); - - let (rt, _) = setup_test_runtime(); - let control_sender = exporter.control_sender(); - let (pdata_tx, pdata_rx) = create_not_send_channel::(1); - let _pdata_tx = Sender::Local(LocalSender::mpsc(pdata_tx)); - let pdata_rx = Receiver::Local(LocalReceiver::mpsc(pdata_rx)); - - let (runtime_ctrl_msg_tx, mut runtime_ctrl_msg_rx) = runtime_ctrl_msg_channel(10); - let (pipeline_completion_msg_tx, _pipeline_completion_msg_rx) = - pipeline_completion_msg_channel::(10); - - exporter - .set_pdata_receiver(test_node("exp"), pdata_rx) - .expect("Failed to set PData Receiver"); - - async fn start_exporter( - exporter: ExporterWrapper, - runtime_ctrl_msg_tx: RuntimeCtrlMsgSender, - pipeline_completion_msg_tx: PipelineCompletionMsgSender, - ) -> Result<(), Error> { - let (_metrics_rx, metrics_reporter) = - otap_df_telemetry::reporter::MetricsReporter::create_new_and_receiver(1); - exporter - .start( - runtime_ctrl_msg_tx, - pipeline_completion_msg_tx, - metrics_reporter, - Interests::empty(), - ) - .await - .map(|_| ()) - } - - let (_exporter_result, _ignored) = rt.block_on(async move { - tokio::join!( - start_exporter(exporter, runtime_ctrl_msg_tx, pipeline_completion_msg_tx), - async move { - // Expect StartTelemetryTimer quickly after startup - let msg = tokio::time::timeout(Duration::from_millis(1500), async { - runtime_ctrl_msg_rx.recv().await - }) - .await - .expect("timed out waiting for StartTelemetryTimer") - .expect("runtime-control channel closed"); - - match msg { - RuntimeControlMsg::StartTelemetryTimer { duration, .. } => { - assert_eq!(duration, Duration::from_secs(1)); - } - other => panic!("Expected StartTelemetryTimer, got {other:?}"), - } - - // Shutdown exporter to end the test - let _ = control_sender - .send(NodeControlMsg::Shutdown { - deadline: Instant::now(), - reason: "done".into(), - }) - .await; - } - ) - }); - } - #[test] fn test_traces() { let test_runtime = TestRuntime::::new(); diff --git a/rust/otap-dataflow/crates/core-nodes/src/exporters/perf_exporter/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/exporters/perf_exporter/mod.rs index fdcefafab9..f49f268ba2 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/exporters/perf_exporter/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/exporters/perf_exporter/mod.rs @@ -48,7 +48,6 @@ use otap_df_telemetry::otel_info; use serde_json::Value; use std::sync::Arc; use std::time::Instant; -use tokio::time::Duration; /// The URN for the OTAP Perf exporter pub const OTAP_PERF_EXPORTER_URN: &str = "urn:otel:exporter:perf"; @@ -143,11 +142,6 @@ impl local::Exporter for PerfExporter { message = "Starting Perf Exporter" ); - // Start telemetry collection tick as a dedicated control message. - let timer_cancel_handle = effect_handler - .start_periodic_telemetry(Duration::from_millis(self.config.frequency())) - .await?; - // Loop until a Shutdown event is received. loop { let msg = msg_chan.recv().await?; @@ -161,7 +155,6 @@ impl local::Exporter for PerfExporter { // ToDo: Handle configuration changes Message::Control(NodeControlMsg::Config { .. }) => {} Message::Control(NodeControlMsg::Shutdown { deadline, .. }) => { - _ = timer_cancel_handle.cancel().await; return Ok(self.terminal_state(deadline)); } Message::PData(mut pdata) => { diff --git a/rust/otap-dataflow/crates/core-nodes/src/exporters/topic_exporter/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/exporters/topic_exporter/mod.rs index babafa573d..1980b2d7a0 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/exporters/topic_exporter/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/exporters/topic_exporter/mod.rs @@ -36,7 +36,6 @@ use std::collections::HashMap; use std::future::Future; use std::pin::Pin; use std::sync::Arc; -use std::time::Duration; /// URN for the topic exporter. pub const TOPIC_EXPORTER_URN: &str = "urn:otel:exporter:topic"; @@ -404,9 +403,6 @@ impl Exporter for TopicExporter { ack_propagation = format!("{ack_propagation_mode:?}"), message = "Topic exporter started" ); - let telemetry_cancel_handle = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; let run_result: Result<(), Error> = async { loop { @@ -583,7 +579,6 @@ impl Exporter for TopicExporter { } .await; - _ = telemetry_cancel_handle.cancel().await; run_result?; Ok(TerminalState::default()) } diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/host_metrics_receiver/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/receivers/host_metrics_receiver/mod.rs index 6b65a934cb..34cc43b5b6 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/host_metrics_receiver/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/receivers/host_metrics_receiver/mod.rs @@ -481,10 +481,6 @@ impl local::Receiver for HostMetricsReceiver { })?; let mut scheduler = FamilyScheduler::new(&config, Instant::now()); - let _ = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; - loop { tokio::select! { biased; diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/internal_telemetry_receiver/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/receivers/internal_telemetry_receiver/mod.rs index 8f10d6f8a2..19d168eb40 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/internal_telemetry_receiver/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/receivers/internal_telemetry_receiver/mod.rs @@ -113,11 +113,6 @@ impl local::Receiver for InternalTelemetryReceiver { let log_tap = internal.log_tap; let mut scope_cache = ScopeToBytesMap::new(internal.registry); - // Start periodic telemetry collection - let _ = effect_handler - .start_periodic_telemetry(std::time::Duration::from_secs(1)) - .await?; - loop { tokio::select! { biased; diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/otap_receiver/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/receivers/otap_receiver/mod.rs index 0578c37100..b629da31e7 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/otap_receiver/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/receivers/otap_receiver/mod.rs @@ -425,13 +425,6 @@ impl shared::Receiver for OTAPReceiver { .add_service(metrics_server) .add_service(traces_server); - // Start periodic telemetry collection - let mut telemetry_cancel_handle = Some( - effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?, - ); - let grpc_shutdown = CancellationToken::new(); let server_task = { let grpc_shutdown = grpc_shutdown.clone(); @@ -482,9 +475,6 @@ impl shared::Receiver for OTAPReceiver { } if server_task_done && states.is_empty() { - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } effect_handler.notify_receiver_drained().await?; self.flush_memory_pressure_metrics(); terminal_state = TerminalState::new(deadline, [self.metrics.snapshot()]); @@ -522,9 +512,6 @@ impl shared::Receiver for OTAPReceiver { otap_df_telemetry::otel_info!("otap_receiver.shutdown"); grpc_shutdown.cancel(); states.force_shutdown(&reason); - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } self.flush_memory_pressure_metrics(); terminal_state = TerminalState::new(deadline, [self.metrics.snapshot()]); break; @@ -543,9 +530,6 @@ impl shared::Receiver for OTAPReceiver { self.handle_nack_response(self.route_nack_response(&states, nack)); } Err(e) => { - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } return Err(Error::ChannelRecvError(e)); } _ => {} @@ -565,9 +549,6 @@ impl shared::Receiver for OTAPReceiver { } if draining_deadline.is_none() { - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } self.flush_memory_pressure_metrics(); terminal_state = TerminalState::new( clock::now().add(Duration::from_secs(1)), diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/otlp_receiver/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/receivers/otlp_receiver/mod.rs index 70909290e7..65ed78e451 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/otlp_receiver/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/receivers/otlp_receiver/mod.rs @@ -67,9 +67,6 @@ use tower::util::Either; /// URN for the OTLP Receiver pub const OTLP_RECEIVER_URN: &str = "urn:otel:receiver:otlp"; -/// Interval for periodic telemetry collection. -const TELEMETRY_INTERVAL: Duration = Duration::from_secs(1); - /// Configuration for OTLP Receiver. /// /// The receiver supports three deployment modes matching the Go collector's `otlpreceiver`: @@ -390,9 +387,6 @@ impl OTLPReceiver { &mut self, msg: NodeControlMsg, registry: &AckRegistry, - _telemetry_cancel_handle: &mut Option< - otap_df_engine::effect_handler::TelemetryTimerCancelHandle, - >, ) -> Result<(), Error> { match msg { NodeControlMsg::CollectTelemetry { @@ -622,19 +616,12 @@ impl shared::Receiver for OTLPReceiver { None }; - let mut telemetry_cancel_handle = Some( - effect_handler - .start_periodic_telemetry(TELEMETRY_INTERVAL) - .await?, - ); - // Run the event loop based on which protocols are enabled. let terminal_state = self .run_event_loop( &mut ctrl_msg_recv, &effect_handler, &ack_registry, - &mut telemetry_cancel_handle, grpc_task, grpc_shutdown, http_task, @@ -659,9 +646,6 @@ impl OTLPReceiver { ctrl_msg_recv: &mut shared::ControlChannel, effect_handler: &shared::EffectHandler, ack_registry: &AckRegistry, - telemetry_cancel_handle: &mut Option< - otap_df_engine::effect_handler::TelemetryTimerCancelHandle, - >, grpc_task: Option, grpc_shutdown: CancellationToken, http_task: Option, @@ -703,9 +687,6 @@ impl OTLPReceiver { } if grpc_task_done && http_task_done && ack_registry.is_empty() { - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } effect_handler.notify_receiver_drained().await?; terminal_state = TerminalState::new(deadline, [self.metrics.lock().snapshot()]); break; @@ -749,9 +730,6 @@ impl OTLPReceiver { grpc_shutdown.cancel(); http_shutdown.cancel(); ack_registry.force_shutdown(&reason); - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } terminal_state = TerminalState::new(deadline, [self.metrics.lock().snapshot()]); break; } @@ -759,16 +737,12 @@ impl OTLPReceiver { self.handle_control_message( other, ack_registry, - telemetry_cancel_handle, ) .await?; } } } Err(e) => { - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } return Err(Error::ChannelRecvError(e)); } } @@ -1112,13 +1086,11 @@ mod tests { }; let receiver_handle = tokio::task::spawn_local(async move { - let mut telemetry_cancel_handle = None; receiver .run_event_loop( &mut ctrl_chan, &effect_handler, &ack_registry, - &mut telemetry_cancel_handle, None, CancellationToken::new(), Some(http_task), diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/syslog_cef_receiver/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/receivers/syslog_cef_receiver/mod.rs index 1defef8073..241795a2db 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/syslog_cef_receiver/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/receivers/syslog_cef_receiver/mod.rs @@ -288,11 +288,6 @@ impl local::Receiver for SyslogCefReceiver { mut ctrl_chan: local::ControlChannel, effect_handler: local::EffectHandler, ) -> Result { - // Start periodic telemetry collection (1s), similar to other nodes - let telemetry_timer_handle = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; - match &self.config.protocol { Protocol::Tcp(tcp_config) => { otel_info!( @@ -347,7 +342,6 @@ impl local::Receiver for SyslogCefReceiver { // for TCP we still wait for already accepted connection tasks // to flush their per-connection buffers before reporting // ReceiverDrained to the runtime. - let _ = telemetry_timer_handle.cancel().await; shutdown_flag.set(true); // Signal all connection tasks to flush and exit // Wait for active tasks to finish flushing. @@ -377,7 +371,6 @@ impl local::Receiver for SyslogCefReceiver { return Ok(TerminalState::new(deadline, [snapshot])); } Ok(NodeControlMsg::Shutdown { deadline, .. }) => { - let _ = telemetry_timer_handle.cancel().await; shutdown_flag.set(true); let snapshot = self.metrics.borrow().snapshot(); return Ok(TerminalState::new(deadline, [snapshot])); @@ -783,7 +776,6 @@ impl local::Receiver for SyslogCefReceiver { // UDP has no long-lived connection tasks, so receiver-first // drain just means: stop ingesting new packets, flush the // current batch buffer once, then report ReceiverDrained. - let _ = telemetry_timer_handle.cancel().await; if arrow_records_builder.len() > 0 { let items = u64::from(arrow_records_builder.len()); @@ -811,7 +803,6 @@ impl local::Receiver for SyslogCefReceiver { return Ok(TerminalState::new(deadline, [snapshot])); } Ok(NodeControlMsg::Shutdown { deadline, .. }) => { - let _ = telemetry_timer_handle.cancel().await; let snapshot = self.metrics.borrow().snapshot(); return Ok(TerminalState::new(deadline, [snapshot])); } diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/topic_receiver/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/receivers/topic_receiver/mod.rs index 894af7f71d..6a9fcb3b56 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/topic_receiver/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/receivers/topic_receiver/mod.rs @@ -40,7 +40,7 @@ use std::collections::HashSet; use std::future::{self, Future}; use std::pin::Pin; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; /// URN for the topic receiver. pub const TOPIC_RECEIVER_URN: &str = "urn:otel:receiver:topic"; @@ -248,11 +248,6 @@ impl local::Receiver for TopicReceiver { ack_propagation = format!("{ack_propagation_mode:?}"), message = "Topic receiver started" ); - let mut telemetry_cancel_handle = Some( - effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?, - ); let mut draining_deadline: Option = None; let mut draining_reason: Option = None; // These represent two different handoff stages: @@ -300,9 +295,6 @@ impl local::Receiver for TopicReceiver { } if pending_tracked_message_ids.is_empty() { - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } effect_handler.notify_receiver_drained().await?; return Ok(TerminalState::new(deadline, [metrics.snapshot()])); } @@ -335,9 +327,6 @@ impl local::Receiver for TopicReceiver { } } } - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } effect_handler.notify_receiver_drained().await?; return Ok(TerminalState::new(deadline, [metrics.snapshot()])); } @@ -460,9 +449,6 @@ impl local::Receiver for TopicReceiver { } } Ok(NodeControlMsg::Shutdown { deadline, .. }) => { - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } return Ok(TerminalState::new(deadline, [metrics.snapshot()])); } Ok(_) => {} @@ -631,9 +617,6 @@ impl local::Receiver for TopicReceiver { } } Ok(NodeControlMsg::Shutdown { deadline, .. }) => { - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } return Ok(TerminalState::new(deadline, [metrics.snapshot()])); } Ok(_) => {} @@ -738,9 +721,6 @@ impl local::Receiver for TopicReceiver { } .await; - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } run_result } } diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/traffic_generator/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/receivers/traffic_generator/mod.rs index 897e78e9b8..648efba092 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/traffic_generator/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/receivers/traffic_generator/mod.rs @@ -524,10 +524,6 @@ impl local::Receiver for TrafficGeneratorReceiver { let transport_headers = build_transport_headers(self.config.transport_headers()); - let _ = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; - let run_len = producer.run_len(); // We consume one tick here because it's always immediately ready and would diff --git a/rust/otap-dataflow/crates/engine/src/control.rs b/rust/otap-dataflow/crates/engine/src/control.rs index af1e7e2f46..e56126ce5c 100644 --- a/rust/otap-dataflow/crates/engine/src/control.rs +++ b/rust/otap-dataflow/crates/engine/src/control.rs @@ -607,6 +607,12 @@ impl ControlSenders { .collect() } + /// Returns all registered node ids. + #[must_use] + pub fn node_ids(&self) -> Vec { + self.senders.keys().copied().collect() + } + /// Returns the registered non-receiver ids. #[must_use] pub fn non_receiver_ids(&self) -> Vec { diff --git a/rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs b/rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs index 450a098ffe..9acf3030ae 100644 --- a/rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs +++ b/rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs @@ -295,7 +295,7 @@ impl RuntimeCtrlMsgManager { channel_metrics: Vec, node_metric_handles: Rc>>>, ) -> Self { - Self { + let mut result = Self { runtime_control_metrics: RuntimeControlMetricsState::new( &pipeline_context, metrics_reporter.clone(), @@ -319,7 +319,19 @@ impl RuntimeCtrlMsgManager { node_metric_handles, telemetry: telemetry_policy, pending_sends: VecDeque::new(), + }; + + // Register telemetry timers for all nodes centrally, using the + // configured reporting interval. This replaces per-node + // start_periodic_telemetry calls and ensures a single, consistent + // collection cadence across all nodes. + for node_id in result.control_senders.node_ids() { + result + .telemetry_timers + .start(node_id, result.control_plane_metrics_flush_interval); } + + result } /// Runs the runtime-control manager event loop. @@ -1247,7 +1259,7 @@ mod tests { use tokio::task::LocalSet; use tokio::time::timeout; - const TEST_CONTROL_PLANE_METRICS_FLUSH_INTERVAL: Duration = Duration::from_millis(10); + const TEST_CONTROL_PLANE_METRICS_FLUSH_INTERVAL: Duration = Duration::from_secs(3600); fn empty_node_metric_handles() -> Rc>>> { Rc::new(RefCell::new(Vec::new())) @@ -2043,8 +2055,8 @@ mod tests { let telemetry_count = manager.test_telemetry_count(); assert_eq!( - telemetry_count, 0, - "Telemetry timer queue should be empty initially" + telemetry_count, 3, + "Telemetry timers should be pre-registered for all 3 nodes" ); assert_eq!( diff --git a/rust/otap-dataflow/crates/engine/src/processor.rs b/rust/otap-dataflow/crates/engine/src/processor.rs index 1ffcd059e2..a8a25b8101 100644 --- a/rust/otap-dataflow/crates/engine/src/processor.rs +++ b/rust/otap-dataflow/crates/engine/src/processor.rs @@ -38,7 +38,6 @@ use otap_df_telemetry::metrics::MetricSet; use otap_df_telemetry::reporter::MetricsReporter; use std::collections::HashMap; use std::sync::Arc; -use std::time::Duration; /// FlowMetric-relevant slice of a processor `EffectHandler`'s surface. /// @@ -621,11 +620,6 @@ impl ProcessorWrapper { flow_metrics_active, ); - // Start periodic telemetry collection - let telemetry_cancel_handle = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; - while let Ok(mut msg) = inbox.recv_when(processor.accept_pdata()).await { if effect_handler.flow_metrics_active() { match &mut msg { @@ -644,8 +638,6 @@ impl ProcessorWrapper { } processor.process(msg, &mut effect_handler).await?; } - // Cancel periodic collection - _ = telemetry_cancel_handle.cancel().await; // Collect final metrics before exiting if effect_handler.is_flow_start() || effect_handler.is_flow_end() { effect_handler.report_flow_metrics(); @@ -681,11 +673,6 @@ impl ProcessorWrapper { flow_metrics_active, ); - // Start periodic telemetry collection - let telemetry_cancel_handle = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; - while let Ok(mut msg) = inbox.recv_when(processor.accept_pdata()).await { if effect_handler.flow_metrics_active() { match &mut msg { @@ -704,8 +691,6 @@ impl ProcessorWrapper { } processor.process(msg, &mut effect_handler).await?; } - // Cancel periodic collection - _ = telemetry_cancel_handle.cancel().await; // Collect final metrics before exiting if effect_handler.is_flow_start() || effect_handler.is_flow_end() { effect_handler.report_flow_metrics(); diff --git a/rust/otap-dataflow/crates/validation/src/validation_exporter.rs b/rust/otap-dataflow/crates/validation/src/validation_exporter.rs index 878fded0a5..62d60ba68a 100644 --- a/rust/otap-dataflow/crates/validation/src/validation_exporter.rs +++ b/rust/otap-dataflow/crates/validation/src/validation_exporter.rs @@ -189,11 +189,8 @@ impl Exporter for ValidationExporter { async fn start( mut self: Box, mut msg_chan: ExporterInbox, - effect_handler: EffectHandler, + _effect_handler: EffectHandler, ) -> Result { - let _ = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; let mut time = Instant::now(); let mut last_message_time = Instant::now(); loop { diff --git a/tools/pipeline_perf_test/test_suites/integration/continuous/idle-state-template.yaml.j2 b/tools/pipeline_perf_test/test_suites/integration/continuous/idle-state-template.yaml.j2 index c70201494c..8fb7326973 100644 --- a/tools/pipeline_perf_test/test_suites/integration/continuous/idle-state-template.yaml.j2 +++ b/tools/pipeline_perf_test/test_suites/integration/continuous/idle-state-template.yaml.j2 @@ -29,6 +29,7 @@ components: {% endif %} prometheus: endpoint: http://localhost:{{port}}/api/v1/telemetry/metrics?format=prometheus&reset=false + interval: 5.0 tests: - name: Idle State Baseline - {{core_label}} @@ -42,7 +43,7 @@ tests: run: pre: - render_template: - template_path: 'test_suites/integration/templates/configs/engine/continuous/otlp-attr-otlp.yaml' + template_path: 'test_suites/integration/templates/configs/engine/continuous/otlp-attr-otlp-idle.yaml' output_path: ./test_suites/integration/configs/engine/config.rendered.yaml variables: backend_hostname: localhost diff --git a/tools/pipeline_perf_test/test_suites/integration/templates/configs/engine/continuous/otlp-attr-otlp-idle.yaml b/tools/pipeline_perf_test/test_suites/integration/templates/configs/engine/continuous/otlp-attr-otlp-idle.yaml new file mode 100644 index 0000000000..4984bb08b5 --- /dev/null +++ b/tools/pipeline_perf_test/test_suites/integration/templates/configs/engine/continuous/otlp-attr-otlp-idle.yaml @@ -0,0 +1,43 @@ +version: otel_dataflow/v1 +policies: + channel_capacity: + control: + node: 100 + pipeline: 100 + pdata: 100 +engine: + telemetry: + reporting_interval: "5s" + logs: + level: info +groups: + default: + pipelines: + main: + nodes: + receiver: + type: urn:otel:receiver:otlp + config: + protocols: + grpc: + listening_addr: 0.0.0.0:4317 + request_compression: gzip + attr: + type: urn:otel:processor:attribute + config: + actions: + - action: rename + source_key: ios.app.state + destination_key: ios.app.state2 + apply_to: + - signal + exporter: + type: urn:otel:exporter:otlp_grpc + config: + grpc_endpoint: http://{{backend_hostname}}:1235 + compression_method: gzip + connections: + - from: receiver + to: attr + - from: attr + to: exporter