From b783a18ff4d0a57a2ab9e00fb2b0102996e1c184 Mon Sep 17 00:00:00 2001 From: Cithomas Date: Fri, 1 May 2026 05:09:51 +0000 Subject: [PATCH 1/5] fix(engine): centralize telemetry timer management in runtime manager Fixes https://github.com/open-telemetry/otel-arrow/issues/1305 Previously, every node (receiver, processor, exporter) independently called effect_handler.start_periodic_telemetry(Duration::from_secs(1)) with a hardcoded 1-second interval. This was: - Not configurable by operators - Not enforceable (each node picked its own interval) - A significant contributor to idle CPU (~50 millicores on 4 cores) The runtime manager now registers telemetry timers for all nodes centrally during pipeline startup, using the configured engine.telemetry.reporting_interval. This: - Removes start_periodic_telemetry calls from all 15 node files - Eliminates per-node cancel handle management on shutdown - Enforces a single, consistent collection cadence by construction - Uses the existing configurable reporting_interval (default 1s) The idle test configuration is updated to use reporting_interval: 5s and a matching 5s Prometheus scrape interval, reducing idle CPU from ~0.9% to ~0.1% on 4 cores. Also fixes the idle-state-template Prometheus endpoint URLs to use the correct /api/v1 prefix. --- .../azure_monitor_exporter/exporter.rs | 9 -- .../src/exporters/geneva_exporter/mod.rs | 7 -- .../src/exporters/otap_exporter/mod.rs | 5 - .../src/exporters/otlp_grpc_exporter/mod.rs | 5 - .../src/exporters/otlp_http_exporter/mod.rs | 6 -- .../src/exporters/parquet_exporter/mod.rs | 100 ------------------ .../src/exporters/perf_exporter/mod.rs | 7 -- .../src/exporters/topic_exporter/mod.rs | 5 - .../src/receivers/fake_data_generator/mod.rs | 4 - .../internal_telemetry_receiver/mod.rs | 5 - .../src/receivers/otap_receiver/mod.rs | 19 ---- .../src/receivers/otlp_receiver/mod.rs | 28 ----- .../src/receivers/syslog_cef_receiver/mod.rs | 9 -- .../src/receivers/topic_receiver/mod.rs | 22 +--- .../crates/engine/src/control.rs | 6 ++ .../crates/engine/src/pipeline_ctrl.rs | 20 +++- .../crates/engine/src/processor.rs | 15 --- .../validation/src/validation_exporter.rs | 5 +- .../continuous/idle-state-template.yaml.j2 | 7 +- .../continuous/otlp-attr-otlp-idle.yaml | 43 ++++++++ 20 files changed, 71 insertions(+), 256 deletions(-) create mode 100644 tools/pipeline_perf_test/test_suites/integration/templates/configs/engine/continuous/otlp-attr-otlp-idle.yaml diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/exporter.rs b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/exporter.rs index f8db79fa52..94845cacd8 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/exporter.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/exporter.rs @@ -486,14 +486,6 @@ impl Exporter for AzureMonitorExporter { } })?; - // Start periodic telemetry collection and retain the cancel handle for graceful shutdown - let telemetry_timer_cancel_handle = effect_handler - .start_periodic_telemetry(std::time::Duration::from_secs(1)) - .await - .map_err(|e| EngineError::InternalError { - message: format!("Failed to start telemetry timer: {e}"), - })?; - let mut next_periodic_export = tokio::time::Instant::now() + tokio::time::Duration::from_secs(PERIODIC_EXPORT_INTERVAL); let mut next_heartbeat_send = tokio::time::Instant::now(); @@ -639,7 +631,6 @@ impl Exporter for AzureMonitorExporter { let _ = self.metrics.borrow_mut().report(&mut metrics_reporter); } Ok(Message::Control(NodeControlMsg::Shutdown { deadline, .. })) => { - let _ = telemetry_timer_cancel_handle.cancel().await; self.handle_shutdown(&effect_handler).await?; let snapshot = self.metrics.borrow().metrics().snapshot(); return Ok(TerminalState::new( diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/geneva_exporter/mod.rs b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/geneva_exporter/mod.rs index a3fd947e30..21975d6e9e 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/geneva_exporter/mod.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/geneva_exporter/mod.rs @@ -724,12 +724,6 @@ impl Exporter for GenevaExporter { message = "Geneva exporter starting" ); - // Start periodic telemetry collection so CollectTelemetry messages - // are delivered to this exporter's message channel. - let timer_cancel_handle = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; - // Message loop loop { match msg_chan.recv().await? { @@ -739,7 +733,6 @@ impl Exporter for GenevaExporter { message = "Geneva exporter shutting down" ); - _ = timer_cancel_handle.cancel().await; return Ok(TerminalState::new( deadline, [self.pdata_metrics.snapshot(), self.metrics.snapshot()], diff --git a/rust/otap-dataflow/crates/core-nodes/src/exporters/otap_exporter/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/exporters/otap_exporter/mod.rs index f7a434873f..e1ad91e582 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/exporters/otap_exporter/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/exporters/otap_exporter/mod.rs @@ -349,10 +349,6 @@ impl local::Exporter for OTAPExporter { } })?; - let timer_cancel_handle = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; - // start a grpc client and connect to the server let mut arrow_metrics_client = ArrowMetricsServiceClient::new(channel.clone()); let mut arrow_logs_client = ArrowLogsServiceClient::new(channel.clone()); @@ -451,7 +447,6 @@ impl local::Exporter for OTAPExporter { await_stream_handles(logs_handles).await; await_stream_handles(metrics_handles).await; await_stream_handles(traces_handles).await; - _ = timer_cancel_handle.cancel().await; self.export_latency_window .report_into(&mut self.async_metrics); return Ok(TerminalState::new( diff --git a/rust/otap-dataflow/crates/core-nodes/src/exporters/otlp_grpc_exporter/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/exporters/otlp_grpc_exporter/mod.rs index a4342f81c5..5b33695df1 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/exporters/otlp_grpc_exporter/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/exporters/otlp_grpc_exporter/mod.rs @@ -46,7 +46,6 @@ use otap_df_telemetry::{otel_debug, otel_info, otel_warn}; use serde::Deserialize; use std::future::Future; use std::sync::Arc; -use std::time::Duration; use tonic::codec::CompressionEncoding; use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue}; use tonic::transport::Channel; @@ -132,9 +131,6 @@ impl Exporter for OTLPExporter { self.config.grpc.log_proxy_info(); let exporter_id = effect_handler.exporter_id(); - let timer_cancel_handle = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; let channel = self .config @@ -246,7 +242,6 @@ impl Exporter for OTLPExporter { grpc_clients.release(client); } } - _ = timer_cancel_handle.cancel().await; return Ok(TerminalState::new(deadline, [self.pdata_metrics])); } Message::Control(NodeControlMsg::CollectTelemetry { diff --git a/rust/otap-dataflow/crates/core-nodes/src/exporters/otlp_http_exporter/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/exporters/otlp_http_exporter/mod.rs index 0db454dcc9..4bde706a41 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/exporters/otlp_http_exporter/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/exporters/otlp_http_exporter/mod.rs @@ -16,7 +16,6 @@ use std::num::NonZeroUsize; use std::rc::Rc; use std::sync::Arc; -use std::time::Duration; use async_trait::async_trait; use bytes::{Bytes, BytesMut}; @@ -225,10 +224,6 @@ impl Exporter for OtlpHttpExporter { traces_endpoint = traces_endpoint.as_str(), ); - let telemetry_timer_cancel = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; - let max_in_flight = self.config.max_in_flight.max(1); let mut client_pool = HttpClientPool::try_new(&self.config.http, self.config.client_pool_size) @@ -307,7 +302,6 @@ impl Exporter for OtlpHttpExporter { .await; } } - _ = telemetry_timer_cancel.cancel().await; return Ok(TerminalState::new(deadline, [self.pdata_metrics])); } Message::Control(NodeControlMsg::CollectTelemetry { diff --git a/rust/otap-dataflow/crates/core-nodes/src/exporters/parquet_exporter/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/exporters/parquet_exporter/mod.rs index 248edf75fc..47090e2e4f 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/exporters/parquet_exporter/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/exporters/parquet_exporter/mod.rs @@ -188,11 +188,6 @@ impl Exporter for ParquetExporter { .await?; } - // Start periodic telemetry collection (internal metrics) - let telemetry_cancel_handle = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; - let mut writer = writer::WriterManager::new(object_store, writer_options); let mut batch_id = 0; let mut id_generator = PartitionSequenceIdGenerator::new(); @@ -256,7 +251,6 @@ impl Exporter for ParquetExporter { // granularity is ~15 ms), so an explicit check avoids a // race between the timeout and the flush future. if deadline.checked_duration_since(Instant::now()).is_none() { - let _ = telemetry_cancel_handle.cancel().await; return Err(Error::IoError { node: exporter_id.clone(), error: std::io::Error::from(ErrorKind::TimedOut), @@ -267,15 +261,7 @@ impl Exporter for ParquetExporter { let flush_all = writer.flush_all().fuse(); pin_mut!(flush_all); // Stop telemetry loop concurrently with flushing; do not block shutdown on cancel - let cancel_fut = async { - let _ = telemetry_cancel_handle.cancel().await; - futures::future::pending::<()>().await - } - .fuse(); - pin_mut!(cancel_fut); - return futures::select_biased! { - _ = cancel_fut => unreachable!(), _timeout = timeout => Err(Error::IoError { node: exporter_id.clone(), error: std::io::Error::from(ErrorKind::TimedOut) @@ -1224,92 +1210,6 @@ mod test { exporter_result.unwrap(); } - #[test] - fn test_starts_telemetry_timer() { - use otap_df_engine::control::runtime_ctrl_msg_channel; - use otap_df_engine::testing::test_node; - - let test_runtime = TestRuntime::::new(); - let temp_dir = tempfile::tempdir().unwrap(); - let base_dir: String = temp_dir.path().to_str().unwrap().into(); - let exporter = ParquetExporter::new(config::Config { - storage: object_store::StorageType::File { - base_uri: base_dir.clone(), - }, - partitioning_strategies: None, - writer_options: None, - }); - let node_config = Arc::new(NodeUserConfig::new_exporter_config(PARQUET_EXPORTER_URN)); - let mut exporter = ExporterWrapper::::local::( - exporter, - test_node(test_runtime.config().name.clone()), - node_config, - test_runtime.config(), - ); - - let (rt, _) = setup_test_runtime(); - let control_sender = exporter.control_sender(); - let (pdata_tx, pdata_rx) = create_not_send_channel::(1); - let _pdata_tx = Sender::Local(LocalSender::mpsc(pdata_tx)); - let pdata_rx = Receiver::Local(LocalReceiver::mpsc(pdata_rx)); - - let (runtime_ctrl_msg_tx, mut runtime_ctrl_msg_rx) = runtime_ctrl_msg_channel(10); - let (pipeline_completion_msg_tx, _pipeline_completion_msg_rx) = - pipeline_completion_msg_channel::(10); - - exporter - .set_pdata_receiver(test_node("exp"), pdata_rx) - .expect("Failed to set PData Receiver"); - - async fn start_exporter( - exporter: ExporterWrapper, - runtime_ctrl_msg_tx: RuntimeCtrlMsgSender, - pipeline_completion_msg_tx: PipelineCompletionMsgSender, - ) -> Result<(), Error> { - let (_metrics_rx, metrics_reporter) = - otap_df_telemetry::reporter::MetricsReporter::create_new_and_receiver(1); - exporter - .start( - runtime_ctrl_msg_tx, - pipeline_completion_msg_tx, - metrics_reporter, - Interests::empty(), - ) - .await - .map(|_| ()) - } - - let (_exporter_result, _ignored) = rt.block_on(async move { - tokio::join!( - start_exporter(exporter, runtime_ctrl_msg_tx, pipeline_completion_msg_tx), - async move { - // Expect StartTelemetryTimer quickly after startup - let msg = tokio::time::timeout(Duration::from_millis(1500), async { - runtime_ctrl_msg_rx.recv().await - }) - .await - .expect("timed out waiting for StartTelemetryTimer") - .expect("runtime-control channel closed"); - - match msg { - RuntimeControlMsg::StartTelemetryTimer { duration, .. } => { - assert_eq!(duration, Duration::from_secs(1)); - } - other => panic!("Expected StartTelemetryTimer, got {other:?}"), - } - - // Shutdown exporter to end the test - let _ = control_sender - .send(NodeControlMsg::Shutdown { - deadline: Instant::now(), - reason: "done".into(), - }) - .await; - } - ) - }); - } - #[test] fn test_traces() { let test_runtime = TestRuntime::::new(); diff --git a/rust/otap-dataflow/crates/core-nodes/src/exporters/perf_exporter/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/exporters/perf_exporter/mod.rs index 2503116ee6..fcc0c078ff 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/exporters/perf_exporter/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/exporters/perf_exporter/mod.rs @@ -47,7 +47,6 @@ use otap_df_telemetry::otel_info; use serde_json::Value; use std::sync::Arc; use std::time::Instant; -use tokio::time::Duration; /// The URN for the OTAP Perf exporter pub const OTAP_PERF_EXPORTER_URN: &str = "urn:otel:exporter:perf"; @@ -142,11 +141,6 @@ impl local::Exporter for PerfExporter { message = "Starting Perf Exporter" ); - // Start telemetry collection tick as a dedicated control message. - let timer_cancel_handle = effect_handler - .start_periodic_telemetry(Duration::from_millis(self.config.frequency())) - .await?; - // Loop until a Shutdown event is received. loop { let msg = msg_chan.recv().await?; @@ -160,7 +154,6 @@ impl local::Exporter for PerfExporter { // ToDo: Handle configuration changes Message::Control(NodeControlMsg::Config { .. }) => {} Message::Control(NodeControlMsg::Shutdown { deadline, .. }) => { - _ = timer_cancel_handle.cancel().await; return Ok(self.terminal_state(deadline)); } Message::PData(mut pdata) => { diff --git a/rust/otap-dataflow/crates/core-nodes/src/exporters/topic_exporter/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/exporters/topic_exporter/mod.rs index 7c522cad97..00fbc92929 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/exporters/topic_exporter/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/exporters/topic_exporter/mod.rs @@ -36,7 +36,6 @@ use std::collections::HashMap; use std::future::Future; use std::pin::Pin; use std::sync::Arc; -use std::time::Duration; /// URN for the topic exporter. pub const TOPIC_EXPORTER_URN: &str = "urn:otel:exporter:topic"; @@ -404,9 +403,6 @@ impl Exporter for TopicExporter { ack_propagation = format!("{ack_propagation_mode:?}"), message = "Topic exporter started" ); - let telemetry_cancel_handle = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; let run_result: Result<(), Error> = async { loop { @@ -583,7 +579,6 @@ impl Exporter for TopicExporter { } .await; - _ = telemetry_cancel_handle.cancel().await; run_result?; Ok(TerminalState::default()) } diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/mod.rs index aa6f65e2e4..74808381c2 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/mod.rs @@ -522,10 +522,6 @@ impl local::Receiver for FakeGeneratorReceiver { let transport_headers = build_transport_headers(self.config.transport_headers()); - let _ = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; - let run_len = producer.run_len(); // We consume one tick here because it's always immediately ready and would diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/internal_telemetry_receiver/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/receivers/internal_telemetry_receiver/mod.rs index 8f10d6f8a2..19d168eb40 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/internal_telemetry_receiver/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/receivers/internal_telemetry_receiver/mod.rs @@ -113,11 +113,6 @@ impl local::Receiver for InternalTelemetryReceiver { let log_tap = internal.log_tap; let mut scope_cache = ScopeToBytesMap::new(internal.registry); - // Start periodic telemetry collection - let _ = effect_handler - .start_periodic_telemetry(std::time::Duration::from_secs(1)) - .await?; - loop { tokio::select! { biased; diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/otap_receiver/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/receivers/otap_receiver/mod.rs index c37ba4582f..5f65415230 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/otap_receiver/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/receivers/otap_receiver/mod.rs @@ -383,13 +383,6 @@ impl shared::Receiver for OTAPReceiver { .add_service(metrics_server) .add_service(traces_server); - // Start periodic telemetry collection - let mut telemetry_cancel_handle = Some( - effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?, - ); - let grpc_shutdown = CancellationToken::new(); let server_task = { let grpc_shutdown = grpc_shutdown.clone(); @@ -440,9 +433,6 @@ impl shared::Receiver for OTAPReceiver { } if server_task_done && states.is_empty() { - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } effect_handler.notify_receiver_drained().await?; self.flush_memory_pressure_metrics(); terminal_state = TerminalState::new(deadline, [self.metrics.snapshot()]); @@ -480,9 +470,6 @@ impl shared::Receiver for OTAPReceiver { otap_df_telemetry::otel_info!("otap_receiver.shutdown"); grpc_shutdown.cancel(); states.force_shutdown(&reason); - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } self.flush_memory_pressure_metrics(); terminal_state = TerminalState::new(deadline, [self.metrics.snapshot()]); break; @@ -501,9 +488,6 @@ impl shared::Receiver for OTAPReceiver { self.handle_nack_response(self.route_nack_response(&states, nack)); } Err(e) => { - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } return Err(Error::ChannelRecvError(e)); } _ => {} @@ -523,9 +507,6 @@ impl shared::Receiver for OTAPReceiver { } if draining_deadline.is_none() { - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } self.flush_memory_pressure_metrics(); terminal_state = TerminalState::new( clock::now().add(Duration::from_secs(1)), diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/otlp_receiver/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/receivers/otlp_receiver/mod.rs index bac50923a9..74a28970cb 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/otlp_receiver/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/receivers/otlp_receiver/mod.rs @@ -65,9 +65,6 @@ use tower::util::Either; /// URN for the OTLP Receiver pub const OTLP_RECEIVER_URN: &str = "urn:otel:receiver:otlp"; -/// Interval for periodic telemetry collection. -const TELEMETRY_INTERVAL: Duration = Duration::from_secs(1); - /// Configuration for OTLP Receiver. /// /// The receiver supports three deployment modes matching the Go collector's `otlpreceiver`: @@ -388,9 +385,6 @@ impl OTLPReceiver { &mut self, msg: NodeControlMsg, registry: &AckRegistry, - _telemetry_cancel_handle: &mut Option< - otap_df_engine::effect_handler::TelemetryTimerCancelHandle, - >, ) -> Result<(), Error> { match msg { NodeControlMsg::CollectTelemetry { @@ -620,19 +614,12 @@ impl shared::Receiver for OTLPReceiver { None }; - let mut telemetry_cancel_handle = Some( - effect_handler - .start_periodic_telemetry(TELEMETRY_INTERVAL) - .await?, - ); - // Run the event loop based on which protocols are enabled. let terminal_state = self .run_event_loop( &mut ctrl_msg_recv, &effect_handler, &ack_registry, - &mut telemetry_cancel_handle, grpc_task, grpc_shutdown, http_task, @@ -657,9 +644,6 @@ impl OTLPReceiver { ctrl_msg_recv: &mut shared::ControlChannel, effect_handler: &shared::EffectHandler, ack_registry: &AckRegistry, - telemetry_cancel_handle: &mut Option< - otap_df_engine::effect_handler::TelemetryTimerCancelHandle, - >, grpc_task: Option, grpc_shutdown: CancellationToken, http_task: Option, @@ -701,9 +685,6 @@ impl OTLPReceiver { } if grpc_task_done && http_task_done && ack_registry.is_empty() { - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } effect_handler.notify_receiver_drained().await?; terminal_state = TerminalState::new(deadline, [self.metrics.lock().snapshot()]); break; @@ -747,9 +728,6 @@ impl OTLPReceiver { grpc_shutdown.cancel(); http_shutdown.cancel(); ack_registry.force_shutdown(&reason); - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } terminal_state = TerminalState::new(deadline, [self.metrics.lock().snapshot()]); break; } @@ -757,16 +735,12 @@ impl OTLPReceiver { self.handle_control_message( other, ack_registry, - telemetry_cancel_handle, ) .await?; } } } Err(e) => { - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } return Err(Error::ChannelRecvError(e)); } } @@ -1110,13 +1084,11 @@ mod tests { }; let receiver_handle = tokio::task::spawn_local(async move { - let mut telemetry_cancel_handle = None; receiver .run_event_loop( &mut ctrl_chan, &effect_handler, &ack_registry, - &mut telemetry_cancel_handle, None, CancellationToken::new(), Some(http_task), diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/syslog_cef_receiver/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/receivers/syslog_cef_receiver/mod.rs index 22652ac687..e4086edf31 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/syslog_cef_receiver/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/receivers/syslog_cef_receiver/mod.rs @@ -288,11 +288,6 @@ impl local::Receiver for SyslogCefReceiver { mut ctrl_chan: local::ControlChannel, effect_handler: local::EffectHandler, ) -> Result { - // Start periodic telemetry collection (1s), similar to other nodes - let telemetry_timer_handle = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; - match &self.config.protocol { Protocol::Tcp(tcp_config) => { otel_info!( @@ -347,7 +342,6 @@ impl local::Receiver for SyslogCefReceiver { // for TCP we still wait for already accepted connection tasks // to flush their per-connection buffers before reporting // ReceiverDrained to the runtime. - let _ = telemetry_timer_handle.cancel().await; shutdown_flag.set(true); // Signal all connection tasks to flush and exit // Wait for active tasks to finish flushing. @@ -377,7 +371,6 @@ impl local::Receiver for SyslogCefReceiver { return Ok(TerminalState::new(deadline, [snapshot])); } Ok(NodeControlMsg::Shutdown { deadline, .. }) => { - let _ = telemetry_timer_handle.cancel().await; shutdown_flag.set(true); let snapshot = self.metrics.borrow().snapshot(); return Ok(TerminalState::new(deadline, [snapshot])); @@ -783,7 +776,6 @@ impl local::Receiver for SyslogCefReceiver { // UDP has no long-lived connection tasks, so receiver-first // drain just means: stop ingesting new packets, flush the // current batch buffer once, then report ReceiverDrained. - let _ = telemetry_timer_handle.cancel().await; if arrow_records_builder.len() > 0 { let items = u64::from(arrow_records_builder.len()); @@ -811,7 +803,6 @@ impl local::Receiver for SyslogCefReceiver { return Ok(TerminalState::new(deadline, [snapshot])); } Ok(NodeControlMsg::Shutdown { deadline, .. }) => { - let _ = telemetry_timer_handle.cancel().await; let snapshot = self.metrics.borrow().snapshot(); return Ok(TerminalState::new(deadline, [snapshot])); } diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/topic_receiver/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/receivers/topic_receiver/mod.rs index a770b9ff9e..52cca6bce3 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/topic_receiver/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/receivers/topic_receiver/mod.rs @@ -40,7 +40,7 @@ use std::collections::HashSet; use std::future::{self, Future}; use std::pin::Pin; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; /// URN for the topic receiver. pub const TOPIC_RECEIVER_URN: &str = "urn:otel:receiver:topic"; @@ -248,11 +248,6 @@ impl local::Receiver for TopicReceiver { ack_propagation = format!("{ack_propagation_mode:?}"), message = "Topic receiver started" ); - let mut telemetry_cancel_handle = Some( - effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?, - ); let mut draining_deadline: Option = None; let mut draining_reason: Option = None; // These represent two different handoff stages: @@ -300,9 +295,6 @@ impl local::Receiver for TopicReceiver { } if pending_tracked_message_ids.is_empty() { - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } effect_handler.notify_receiver_drained().await?; return Ok(TerminalState::new(deadline, [metrics.snapshot()])); } @@ -335,9 +327,6 @@ impl local::Receiver for TopicReceiver { } } } - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } effect_handler.notify_receiver_drained().await?; return Ok(TerminalState::new(deadline, [metrics.snapshot()])); } @@ -460,9 +449,6 @@ impl local::Receiver for TopicReceiver { } } Ok(NodeControlMsg::Shutdown { deadline, .. }) => { - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } return Ok(TerminalState::new(deadline, [metrics.snapshot()])); } Ok(_) => {} @@ -631,9 +617,6 @@ impl local::Receiver for TopicReceiver { } } Ok(NodeControlMsg::Shutdown { deadline, .. }) => { - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } return Ok(TerminalState::new(deadline, [metrics.snapshot()])); } Ok(_) => {} @@ -738,9 +721,6 @@ impl local::Receiver for TopicReceiver { } .await; - if let Some(handle) = telemetry_cancel_handle.take() { - _ = handle.cancel().await; - } run_result } } diff --git a/rust/otap-dataflow/crates/engine/src/control.rs b/rust/otap-dataflow/crates/engine/src/control.rs index af1e7e2f46..e56126ce5c 100644 --- a/rust/otap-dataflow/crates/engine/src/control.rs +++ b/rust/otap-dataflow/crates/engine/src/control.rs @@ -607,6 +607,12 @@ impl ControlSenders { .collect() } + /// Returns all registered node ids. + #[must_use] + pub fn node_ids(&self) -> Vec { + self.senders.keys().copied().collect() + } + /// Returns the registered non-receiver ids. #[must_use] pub fn non_receiver_ids(&self) -> Vec { diff --git a/rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs b/rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs index 49b209a54b..e9cd9c05f2 100644 --- a/rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs +++ b/rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs @@ -295,7 +295,7 @@ impl RuntimeCtrlMsgManager { channel_metrics: Vec, node_metric_handles: Rc>>>, ) -> Self { - Self { + let mut result = Self { runtime_control_metrics: RuntimeControlMetricsState::new( &pipeline_context, metrics_reporter.clone(), @@ -319,7 +319,19 @@ impl RuntimeCtrlMsgManager { node_metric_handles, telemetry: telemetry_policy, pending_sends: VecDeque::new(), + }; + + // Register telemetry timers for all nodes centrally, using the + // configured reporting interval. This replaces per-node + // start_periodic_telemetry calls and ensures a single, consistent + // collection cadence across all nodes. + for node_id in result.control_senders.node_ids() { + result + .telemetry_timers + .start(node_id, result.control_plane_metrics_flush_interval); } + + result } /// Runs the runtime-control manager event loop. @@ -1247,7 +1259,7 @@ mod tests { use tokio::task::LocalSet; use tokio::time::timeout; - const TEST_CONTROL_PLANE_METRICS_FLUSH_INTERVAL: Duration = Duration::from_millis(10); + const TEST_CONTROL_PLANE_METRICS_FLUSH_INTERVAL: Duration = Duration::from_secs(3600); fn empty_node_metric_handles() -> Rc>>> { Rc::new(RefCell::new(Vec::new())) @@ -2043,8 +2055,8 @@ mod tests { let telemetry_count = manager.test_telemetry_count(); assert_eq!( - telemetry_count, 0, - "Telemetry timer queue should be empty initially" + telemetry_count, 3, + "Telemetry timers should be pre-registered for all 3 nodes" ); assert_eq!( diff --git a/rust/otap-dataflow/crates/engine/src/processor.rs b/rust/otap-dataflow/crates/engine/src/processor.rs index b5217b03ee..6f864a832b 100644 --- a/rust/otap-dataflow/crates/engine/src/processor.rs +++ b/rust/otap-dataflow/crates/engine/src/processor.rs @@ -34,7 +34,6 @@ use otap_df_config::node::NodeUserConfig; use otap_df_telemetry::reporter::MetricsReporter; use std::collections::HashMap; use std::sync::Arc; -use std::time::Duration; /// Processor-local wakeup requirements declared by a processor implementation. /// @@ -547,16 +546,9 @@ impl ProcessorWrapper { .core .set_completion_emission_metrics(completion_emission_metrics.clone()); - // Start periodic telemetry collection - let telemetry_cancel_handle = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; - while let Ok(msg) = inbox.recv_when(processor.accept_pdata()).await { processor.process(msg, &mut effect_handler).await?; } - // Cancel periodic collection - _ = telemetry_cancel_handle.cancel().await; // Collect final metrics before exiting processor .process( @@ -581,16 +573,9 @@ impl ProcessorWrapper { .core .set_completion_emission_metrics(completion_emission_metrics); - // Start periodic telemetry collection - let telemetry_cancel_handle = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; - while let Ok(msg) = inbox.recv_when(processor.accept_pdata()).await { processor.process(msg, &mut effect_handler).await?; } - // Cancel periodic collection - _ = telemetry_cancel_handle.cancel().await; // Collect final metrics before exiting processor .process( diff --git a/rust/otap-dataflow/crates/validation/src/validation_exporter.rs b/rust/otap-dataflow/crates/validation/src/validation_exporter.rs index 76ddf1225b..922ba0510e 100644 --- a/rust/otap-dataflow/crates/validation/src/validation_exporter.rs +++ b/rust/otap-dataflow/crates/validation/src/validation_exporter.rs @@ -188,11 +188,8 @@ impl Exporter for ValidationExporter { async fn start( mut self: Box, mut msg_chan: ExporterInbox, - effect_handler: EffectHandler, + _effect_handler: EffectHandler, ) -> Result { - let _ = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; let mut time = Instant::now(); let mut last_message_time = Instant::now(); loop { diff --git a/tools/pipeline_perf_test/test_suites/integration/continuous/idle-state-template.yaml.j2 b/tools/pipeline_perf_test/test_suites/integration/continuous/idle-state-template.yaml.j2 index 353edc56a2..8fb7326973 100644 --- a/tools/pipeline_perf_test/test_suites/integration/continuous/idle-state-template.yaml.j2 +++ b/tools/pipeline_perf_test/test_suites/integration/continuous/idle-state-template.yaml.j2 @@ -28,7 +28,8 @@ components: docker_component: {} {% endif %} prometheus: - endpoint: http://localhost:{{port}}/telemetry/metrics?format=prometheus&reset=false + endpoint: http://localhost:{{port}}/api/v1/telemetry/metrics?format=prometheus&reset=false + interval: 5.0 tests: - name: Idle State Baseline - {{core_label}} @@ -42,13 +43,13 @@ tests: run: pre: - render_template: - template_path: 'test_suites/integration/templates/configs/engine/continuous/otlp-attr-otlp.yaml' + template_path: 'test_suites/integration/templates/configs/engine/continuous/otlp-attr-otlp-idle.yaml' output_path: ./test_suites/integration/configs/engine/config.rendered.yaml variables: backend_hostname: localhost post: - ready_check_http: - url: http://localhost:{{port}}/telemetry/metrics?reset=false + url: http://localhost:{{port}}/api/v1/telemetry/metrics?reset=false method: GET expected_status_code: 200 diff --git a/tools/pipeline_perf_test/test_suites/integration/templates/configs/engine/continuous/otlp-attr-otlp-idle.yaml b/tools/pipeline_perf_test/test_suites/integration/templates/configs/engine/continuous/otlp-attr-otlp-idle.yaml new file mode 100644 index 0000000000..4984bb08b5 --- /dev/null +++ b/tools/pipeline_perf_test/test_suites/integration/templates/configs/engine/continuous/otlp-attr-otlp-idle.yaml @@ -0,0 +1,43 @@ +version: otel_dataflow/v1 +policies: + channel_capacity: + control: + node: 100 + pipeline: 100 + pdata: 100 +engine: + telemetry: + reporting_interval: "5s" + logs: + level: info +groups: + default: + pipelines: + main: + nodes: + receiver: + type: urn:otel:receiver:otlp + config: + protocols: + grpc: + listening_addr: 0.0.0.0:4317 + request_compression: gzip + attr: + type: urn:otel:processor:attribute + config: + actions: + - action: rename + source_key: ios.app.state + destination_key: ios.app.state2 + apply_to: + - signal + exporter: + type: urn:otel:exporter:otlp_grpc + config: + grpc_endpoint: http://{{backend_hostname}}:1235 + compression_method: gzip + connections: + - from: receiver + to: attr + - from: attr + to: exporter From c087b7886ca89f293396a5667f93b386e88e4b62 Mon Sep 17 00:00:00 2001 From: Cithomas Date: Tue, 12 May 2026 06:07:23 +0000 Subject: [PATCH 2/5] fix(config): reject zero reporting_interval in telemetry config A zero-duration reporting_interval would cause telemetry timers to reschedule at the same instant repeatedly, spinning the runtime control loop. Validate that the interval is non-zero during config validation so the error is caught early with a clear message. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../crates/config/src/pipeline/telemetry.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/rust/otap-dataflow/crates/config/src/pipeline/telemetry.rs b/rust/otap-dataflow/crates/config/src/pipeline/telemetry.rs index 33e2eaa1df..4a4d2c172b 100644 --- a/rust/otap-dataflow/crates/config/src/pipeline/telemetry.rs +++ b/rust/otap-dataflow/crates/config/src/pipeline/telemetry.rs @@ -36,6 +36,11 @@ pub struct TelemetryConfig { impl TelemetryConfig { /// Validates the telemetry configuration, including all metric readers. pub fn validate(&self) -> Result<(), crate::error::Error> { + if self.reporting_interval.is_zero() { + return Err(crate::error::Error::InvalidUserConfig { + error: "engine.telemetry.reporting_interval must be greater than zero".to_string(), + }); + } self.metrics.validate() } } @@ -312,6 +317,19 @@ mod tests { assert_eq!(config.metrics.readers.len(), 0); } + #[test] + fn test_zero_reporting_interval_rejected() { + let config = TelemetryConfig { + reporting_interval: Duration::ZERO, + ..Default::default() + }; + let err = config.validate().unwrap_err(); + assert!( + err.to_string().contains("greater than zero"), + "expected zero-interval error, got: {err}" + ); + } + #[test] fn test_attribute_value_deserialize_yaml() { let yaml_str = r#" From ecc447e1b89371b3cda519748a9c6f7daa79dfe1 Mon Sep 17 00:00:00 2001 From: Cithomas Date: Tue, 12 May 2026 06:28:48 +0000 Subject: [PATCH 3/5] chore: remove unrelated files accidentally included in PR --- issue.md | 118 ---------- .../bench/sender-direct-zstd-mif20.yaml | 37 --- .../bench/sender-direct-zstd.yaml | 36 --- .../bench/sender-pregen-zstd-mif20.yaml | 37 --- .../bench/sender-static-fresh-gzip.yaml | 36 --- .../bench/sender-static-fresh-zstd.yaml | 36 --- .../bench/sender-static-pregen-gzip.yaml | 36 --- .../bench/sender-static-pregen-zstd.yaml | 36 --- .../bench/sut-otlp-forward-gzip.yaml | 32 --- .../bench/sut-otlp-forward-zstd.yaml | 32 --- .../scripts/saturation_scaling_benchmark.sh | 204 ----------------- .../saturation-2lg-template.yaml.j2 | 104 --------- .../continuous/saturation-4cores-2lg.yaml | 19 -- .../test_steps/df-2loadgen-steps-docker.yaml | 210 ------------------ 14 files changed, 973 deletions(-) delete mode 100644 issue.md delete mode 100644 rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-direct-zstd-mif20.yaml delete mode 100644 rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-direct-zstd.yaml delete mode 100644 rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-pregen-zstd-mif20.yaml delete mode 100644 rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-static-fresh-gzip.yaml delete mode 100644 rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-static-fresh-zstd.yaml delete mode 100644 rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-static-pregen-gzip.yaml delete mode 100644 rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-static-pregen-zstd.yaml delete mode 100644 rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sut-otlp-forward-gzip.yaml delete mode 100644 rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sut-otlp-forward-zstd.yaml delete mode 100755 tools/pipeline_perf_test/scripts/saturation_scaling_benchmark.sh delete mode 100644 tools/pipeline_perf_test/test_suites/integration/continuous/saturation-2lg-template.yaml.j2 delete mode 100644 tools/pipeline_perf_test/test_suites/integration/continuous/saturation-4cores-2lg.yaml delete mode 100644 tools/pipeline_perf_test/test_suites/integration/templates/test_steps/df-2loadgen-steps-docker.yaml diff --git a/issue.md b/issue.md deleted file mode 100644 index 45b076c9b7..0000000000 --- a/issue.md +++ /dev/null @@ -1,118 +0,0 @@ -# Idle CPU Overhead: ~7.5 millicores per pipeline core - -## Summary - -At idle (no data flowing), each pipeline core consumes approximately **7.5 millicores** of CPU due to hardcoded periodic wakeups. At scale this is significant: - -| Cores | Idle CPU (millicores) | Equivalent | -|-------|----------------------|------------| -| 1 | ~7.5 | Barely noticeable | -| 16 | ~120 | Measurable (0.75% of host) | -| 128 | ~960 | Nearly 1 full core wasted | - -## Root Cause: 4 Wakeups/sec Per Core - -Each pipeline core runs a tokio `current_thread` runtime that wakes up **4 times per second** even with zero data flowing: - -| Wakeup Source | Frequency | Location | Configurable? | -|--------------|-----------|----------|---------------| -| Control-plane metrics flush | 1/sec | `pipeline_ctrl.rs` L383 | No (hardcoded) | -| Node telemetry timer × N nodes | N/sec (1 per node) | Every node's `start()` calls `start_periodic_telemetry(1s)` | No (hardcoded) | - -With a typical 3-node pipeline (receiver → processor → exporter): **4 wakeups/sec/core**. - -Each wakeup involves: tokio timer fire → task poll → metric collection → channel send → aggregation → park. This explains ~7.5 millicores per core. - -### Additionally (always-on, not per-core) - -| Task | Frequency | Location | -|------|-----------|----------| -| `engine-metrics` (RSS, CPU sampling) | Every 5s | `controller/src/lib.rs` L1189 | -| `metrics-aggregator` | Event-driven (channel recv) | `controller/src/lib.rs` L1151 | -| `observed-state-store` | Event-driven (channel recv) | `controller/src/lib.rs` L1169 | - -## Measured Data - -### 1-Core Idle (3 runs each, 15s observation window) - -Varying engine config had no meaningful CPU impact at 1 core (all within noise ~0.1-0.2%): - -| Config | Avg CPU % | RAM MiB | -|--------|-----------|---------| -| Baseline (reporting_interval=1s) | 0.11 | 32.8 | -| Slow reporting (reporting_interval=30s) | 0.14 | 11.9 | -| Minimal (30s + logs=error) | 0.12 | 7.5 | -| Noop pipeline (30s + error + noop exporter) | 0.11 | 7.4 | - -### 16-Core Idle (3 runs each) - -Existing telemetry policy knobs show measurable impact at scale: - -| Config | Avg CPU % | RAM MiB | -|--------|-----------|---------| -| ALL ON (default) | 0.75 | 16.0 | -| `tokio_metrics: false` | 0.69 | 15.9 | -| `runtime_metrics: none` | 0.79 | 14.8 | -| ALL OFF | 0.46 | 14.4 | - -ALL OFF reduces CPU by ~40% vs ALL ON, but the irreducible baseline (~0.46%) comes from the hardcoded timer wakeups. - -## Existing Config Knobs - -The engine already has telemetry policy controls (in `policies.telemetry`): - -```yaml -policies: - telemetry: - pipeline_metrics: false # Disables pipeline.metrics set (12 series/core) - tokio_metrics: false # Disables tokio.runtime set (6 series/core) - runtime_metrics: none # Disables channel.* + pipeline.runtime_control sets -``` - -These reduce the number of exposed **metric series** (69 → fewer) and provide modest CPU savings at high core counts, but do NOT reduce the wakeup frequency since the timers still fire. - -## Prometheus Metrics at Idle (1 core, 3 nodes) - -- **46 unique metric names**, **69 time series** -- These scale as: `series ≈ 69 × cores × (nodes/3)` - -| Category (`set` label) | Series | Scales with | -|------------------------|--------|-------------| -| `channel.receiver` | 20 | cores × channels | -| `pipeline.runtime_control` | 20 | cores | -| `channel.sender` | 9 | cores × channels | -| `pipeline.metrics` | 12 | cores | -| `tokio.runtime` | 6 | cores | -| `engine.metrics` | 2 | fixed | - -## Proposed Optimizations - -### High Impact (order of magnitude idle CPU reduction) - -1. **Make per-node telemetry collection interval configurable** — Currently every node calls `start_periodic_telemetry(Duration::from_secs(1))` at startup. Extending to 5-10s (or making it respect `reporting_interval`) would eliminate 3 of 4 wakeups/sec/core. - -2. **Make control-plane metrics flush interval configurable** — Currently hardcoded to 1s in `pipeline_ctrl.rs`. Should respect the existing `reporting_interval` config (already user-settable). When set to 10s, this eliminates 1 wakeup/sec/core. - -3. **Combined**: Moving both from 1s → 10s would reduce wakeups from 4/sec/core to 0.4/sec/core — a **10× reduction** in idle CPU, bringing it under 1 millicore/core. - -### Medium Impact - -4. **Make `engine-metrics` interval configurable** — Currently hardcoded to 5s (`controller/src/lib.rs` L1189). There's even a `TODO` comment in the code for this. - -### Lower Priority - -5. **Consider disabling tokio's I/O driver** on cores that don't need it (`.enable_time()` instead of `.enable_all()`). - -6. **Consider lazy telemetry timer registration** — Don't start per-node telemetry timers until the first message arrives (like durable_buffer and other processors already do). - -## Files Referenced - -- Per-node telemetry timer: `crates/core-nodes/src/receivers/otlp_receiver/mod.rs` (and every other node) -- Control-plane metrics flush: `crates/engine/src/pipeline_ctrl.rs` L383 -- Engine metrics interval: `crates/controller/src/lib.rs` L1189 -- Telemetry policy config: `crates/config/src/policy.rs` (TelemetryPolicy struct) -- Reporting interval config: `crates/config/src/pipeline/telemetry.rs` (default 1s) - -## Bug Fix Found During Investigation - -The idle state performance test template had stale Prometheus endpoint URLs (`/telemetry/metrics` instead of `/api/v1/telemetry/metrics`), causing 404 errors during monitoring. This was fixed in `idle-state-template.yaml.j2`. diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-direct-zstd-mif20.yaml b/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-direct-zstd-mif20.yaml deleted file mode 100644 index b8951b84fb..0000000000 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-direct-zstd-mif20.yaml +++ /dev/null @@ -1,37 +0,0 @@ -version: otel_dataflow/v1 -engine: {} -groups: - default: - pipelines: - main: - policies: - channel_capacity: - control: - node: 100 - pipeline: 100 - pdata: 128 - - nodes: - receiver: - type: receiver:traffic_generator - config: - data_source: static - generation_strategy: pre_generated - traffic_config: - signals_per_second: null - max_batch_size: 50 - metric_weight: 0 - trace_weight: 0 - log_weight: 1 - log_body_size_bytes: 1024 - - exporter: - type: exporter:otlp_grpc - config: - grpc_endpoint: "http://127.0.0.1:4319" - compression: zstd - max_in_flight: 20 - - connections: - - from: receiver - to: exporter diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-direct-zstd.yaml b/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-direct-zstd.yaml deleted file mode 100644 index 3b67dddef6..0000000000 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-direct-zstd.yaml +++ /dev/null @@ -1,36 +0,0 @@ -version: otel_dataflow/v1 -engine: {} -groups: - default: - pipelines: - main: - policies: - channel_capacity: - control: - node: 100 - pipeline: 100 - pdata: 128 - - nodes: - receiver: - type: receiver:traffic_generator - config: - data_source: static - generation_strategy: pre_generated - traffic_config: - signals_per_second: null - max_batch_size: 50 - metric_weight: 0 - trace_weight: 0 - log_weight: 1 - log_body_size_bytes: 1024 - - exporter: - type: exporter:otlp_grpc - config: - grpc_endpoint: "http://127.0.0.1:4319" - compression: zstd - - connections: - - from: receiver - to: exporter diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-pregen-zstd-mif20.yaml b/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-pregen-zstd-mif20.yaml deleted file mode 100644 index 48663c79c5..0000000000 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-pregen-zstd-mif20.yaml +++ /dev/null @@ -1,37 +0,0 @@ -version: otel_dataflow/v1 -engine: {} -groups: - default: - pipelines: - main: - policies: - channel_capacity: - control: - node: 100 - pipeline: 100 - pdata: 128 - - nodes: - receiver: - type: receiver:traffic_generator - config: - data_source: static - generation_strategy: pre_generated - traffic_config: - signals_per_second: null - max_batch_size: 50 - metric_weight: 0 - trace_weight: 0 - log_weight: 1 - log_body_size_bytes: 1024 - - exporter: - type: exporter:otlp_grpc - config: - grpc_endpoint: "http://127.0.0.1:4317" - compression: zstd - max_in_flight: 20 - - connections: - - from: receiver - to: exporter diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-static-fresh-gzip.yaml b/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-static-fresh-gzip.yaml deleted file mode 100644 index 0c9ab61463..0000000000 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-static-fresh-gzip.yaml +++ /dev/null @@ -1,36 +0,0 @@ -version: otel_dataflow/v1 -engine: {} -groups: - default: - pipelines: - main: - policies: - channel_capacity: - control: - node: 100 - pipeline: 100 - pdata: 128 - - nodes: - receiver: - type: receiver:traffic_generator - config: - data_source: static - generation_strategy: fresh - traffic_config: - signals_per_second: null - max_batch_size: 50 - metric_weight: 0 - trace_weight: 0 - log_weight: 1 - log_body_size_bytes: 1024 - - exporter: - type: exporter:otlp_grpc - config: - grpc_endpoint: "http://127.0.0.1:4317" - compression: gzip - - connections: - - from: receiver - to: exporter diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-static-fresh-zstd.yaml b/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-static-fresh-zstd.yaml deleted file mode 100644 index 4bc32fbdf0..0000000000 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-static-fresh-zstd.yaml +++ /dev/null @@ -1,36 +0,0 @@ -version: otel_dataflow/v1 -engine: {} -groups: - default: - pipelines: - main: - policies: - channel_capacity: - control: - node: 100 - pipeline: 100 - pdata: 128 - - nodes: - receiver: - type: receiver:traffic_generator - config: - data_source: static - generation_strategy: fresh - traffic_config: - signals_per_second: null - max_batch_size: 50 - metric_weight: 0 - trace_weight: 0 - log_weight: 1 - log_body_size_bytes: 1024 - - exporter: - type: exporter:otlp_grpc - config: - grpc_endpoint: "http://127.0.0.1:4317" - compression: zstd - - connections: - - from: receiver - to: exporter diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-static-pregen-gzip.yaml b/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-static-pregen-gzip.yaml deleted file mode 100644 index 359c709992..0000000000 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-static-pregen-gzip.yaml +++ /dev/null @@ -1,36 +0,0 @@ -version: otel_dataflow/v1 -engine: {} -groups: - default: - pipelines: - main: - policies: - channel_capacity: - control: - node: 100 - pipeline: 100 - pdata: 128 - - nodes: - receiver: - type: receiver:traffic_generator - config: - data_source: static - generation_strategy: pre_generated - traffic_config: - signals_per_second: null - max_batch_size: 50 - metric_weight: 0 - trace_weight: 0 - log_weight: 1 - log_body_size_bytes: 1024 - - exporter: - type: exporter:otlp_grpc - config: - grpc_endpoint: "http://127.0.0.1:4317" - compression: gzip - - connections: - - from: receiver - to: exporter diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-static-pregen-zstd.yaml b/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-static-pregen-zstd.yaml deleted file mode 100644 index b15817b90a..0000000000 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sender-static-pregen-zstd.yaml +++ /dev/null @@ -1,36 +0,0 @@ -version: otel_dataflow/v1 -engine: {} -groups: - default: - pipelines: - main: - policies: - channel_capacity: - control: - node: 100 - pipeline: 100 - pdata: 128 - - nodes: - receiver: - type: receiver:traffic_generator - config: - data_source: static - generation_strategy: pre_generated - traffic_config: - signals_per_second: null - max_batch_size: 50 - metric_weight: 0 - trace_weight: 0 - log_weight: 1 - log_body_size_bytes: 1024 - - exporter: - type: exporter:otlp_grpc - config: - grpc_endpoint: "http://127.0.0.1:4317" - compression: zstd - - connections: - - from: receiver - to: exporter diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sut-otlp-forward-gzip.yaml b/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sut-otlp-forward-gzip.yaml deleted file mode 100644 index e68240a5f2..0000000000 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sut-otlp-forward-gzip.yaml +++ /dev/null @@ -1,32 +0,0 @@ -version: otel_dataflow/v1 -engine: - http_admin: - bind_address: "127.0.0.1:8081" -groups: - default: - pipelines: - main: - policies: - channel_capacity: - control: - node: 100 - pipeline: 100 - pdata: 128 - - nodes: - receiver: - type: receiver:otlp - config: - protocols: - grpc: - listening_addr: "127.0.0.1:4317" - - exporter: - type: exporter:otlp_grpc - config: - grpc_endpoint: "http://127.0.0.1:4319" - compression: gzip - - connections: - - from: receiver - to: exporter diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sut-otlp-forward-zstd.yaml b/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sut-otlp-forward-zstd.yaml deleted file mode 100644 index ac4ac431ee..0000000000 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/fake_data_generator/bench/sut-otlp-forward-zstd.yaml +++ /dev/null @@ -1,32 +0,0 @@ -version: otel_dataflow/v1 -engine: - http_admin: - bind_address: "127.0.0.1:8081" -groups: - default: - pipelines: - main: - policies: - channel_capacity: - control: - node: 100 - pipeline: 100 - pdata: 128 - - nodes: - receiver: - type: receiver:otlp - config: - protocols: - grpc: - listening_addr: "127.0.0.1:4317" - - exporter: - type: exporter:otlp_grpc - config: - grpc_endpoint: "http://127.0.0.1:4319" - compression: zstd - - connections: - - from: receiver - to: exporter diff --git a/tools/pipeline_perf_test/scripts/saturation_scaling_benchmark.sh b/tools/pipeline_perf_test/scripts/saturation_scaling_benchmark.sh deleted file mode 100755 index c03732306f..0000000000 --- a/tools/pipeline_perf_test/scripts/saturation_scaling_benchmark.sh +++ /dev/null @@ -1,204 +0,0 @@ -#!/bin/bash -# ============================================================================= -# Saturation Scaling Benchmark -# ============================================================================= -# Tests how well the loadgen can saturate the engine (SUT) across different -# core counts and loadgen:engine ratios. -# -# Findings this script validates: -# 1. 1-core and 2-core SUT: 2:1 ratio achieves >95% CPU saturation -# 2. 4-core SUT with 2:1 ratio (8 LG cores): ~75% CPU due to SO_REUSEPORT -# hash imbalance (always 2 hot + 2 cold engine cores) -# 3. 4-core SUT with 1:1 ratio (4 LG cores): ~50% CPU, 1 core at 0% -# 4. 2-container LG doesn't help — Docker bridge IPs don't add enough -# hash entropy -# -# Prerequisites: -# - Docker with df_engine:latest image built -# - Python venv at tools/pipeline_perf_test/.venv with orchestrator deps -# - At least 16 cores available -# -# Usage: -# cd tools/pipeline_perf_test -# bash scripts/saturation_scaling_benchmark.sh -# -# Runtime: ~15 minutes (5 configs × 2 runs × ~90s each) -# ============================================================================= - -set -eo pipefail -SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" -PERF_DIR="$(cd "$SCRIPT_DIR/.." && pwd)" -cd "$PERF_DIR" - -source .venv/bin/activate - -RESULTS_DIR="$PERF_DIR/results/scaling_benchmark" -mkdir -p "$RESULTS_DIR" - -RUNS_PER_CONFIG=2 -METRIC_CAPTURE_DELAY=50 # seconds after test start to capture per-core metrics - -# ============================================================================= -# Helper: run a test config, capture per-core metrics, return summary -# ============================================================================= -run_test() { - local label="$1" - local config="$2" - local run_num="$3" - local log_file="$RESULTS_DIR/${label//[ \/]/_}_run${run_num}.log" - - echo " [$label] Run $run_num starting..." - - # Run orchestrator in background - python ./orchestrator/run_orchestrator.py --config "$config" \ - 2>&1 > "$log_file" & - local pid=$! - - # Wait for observation phase, then capture per-core CPU - sleep $METRIC_CAPTURE_DELAY - - local engine_cores="" - engine_cores=$(curl -s "http://localhost:8086/api/v1/telemetry/metrics?reset=false" 2>/dev/null \ - | awk '/cpu_utilization\{set="pipeline.metrics"/{ - match($0, /core_id="([^"]+)"/, c); - printf "%s:%.1f ", c[1], $2*100 - }') || true - - # Wait for test to complete - wait $pid || true - - # Extract summary metrics from log - local cpu_norm=$(grep "cpu_percentage_normalized_avg" "$log_file" 2>/dev/null | tail -1 | tr -d '|' | awk '{for(i=1;i<=NF;i++) if($i+0 > 1 && $i ~ /^[0-9]/) print $i}' | head -1) - local throughput=$(grep "logs_produced_rate" "$log_file" 2>/dev/null | tail -1 | tr -d '|' | awk '{for(i=1;i<=NF;i++) if($i+0 > 1000 && $i ~ /^[0-9]/) print $i}' | head -1) - - # Count cores at 0% and cores >90% - local zero_cores=0 - local hot_cores=0 - for pair in $engine_cores; do - local val="${pair#*:}" - val="${val%%%*}" - local is_zero=$(awk "BEGIN{print ($val < 1.0) ? 1 : 0}" 2>/dev/null || echo 0) - local is_hot=$(awk "BEGIN{print ($val > 90.0) ? 1 : 0}" 2>/dev/null || echo 0) - [ "$is_zero" = "1" ] && zero_cores=$((zero_cores + 1)) - [ "$is_hot" = "1" ] && hot_cores=$((hot_cores + 1)) - done - - # Record result - echo "$label|$run_num|${cpu_norm:-n/a}|${throughput:-n/a}|$engine_cores|$zero_cores|$hot_cores" >> "$RESULTS_DIR/all_results.csv" - - echo " [$label] Run $run_num: CPU=${cpu_norm:-?}% Throughput=${throughput:-?} logs/s Per-core: $engine_cores (${zero_cores} idle, ${hot_cores} hot)" -} - -# ============================================================================= -# Main -# ============================================================================= -echo "" -echo "=================================================================" -echo " Saturation Scaling Benchmark" -echo " $RUNS_PER_CONFIG runs per configuration" -echo "=================================================================" -echo "" - -# Clear previous results -> "$RESULTS_DIR/all_results.csv" -echo "label|run|cpu_norm_avg|throughput|per_core_cpu|zero_cores|hot_cores" >> "$RESULTS_DIR/all_results.csv" - -# --- Config 1: 1 SUT core, 2 LG cores (2:1) --- -echo "--- [1/5] 1-core SUT, 2:1 ratio (2 LG cores) ---" -for run in $(seq 1 $RUNS_PER_CONFIG); do - run_test "1core-2to1" \ - "./test_suites/integration/continuous/saturation-1core.yaml" \ - "$run" -done -echo "" - -# --- Config 2: 2 SUT cores, 4 LG cores (2:1) --- -echo "--- [2/5] 2-core SUT, 2:1 ratio (4 LG cores) ---" -for run in $(seq 1 $RUNS_PER_CONFIG); do - run_test "2core-2to1" \ - "./test_suites/integration/continuous/saturation-2cores.yaml" \ - "$run" -done -echo "" - -# --- Config 3: 4 SUT cores, 8 LG cores (2:1) --- -echo "--- [3/5] 4-core SUT, 2:1 ratio (8 LG cores, 1 container) ---" -for run in $(seq 1 $RUNS_PER_CONFIG); do - run_test "4core-2to1" \ - "./test_suites/integration/continuous/saturation-4cores.yaml" \ - "$run" -done -echo "" - -# --- Config 4: 4 SUT cores, 4 LG cores (1:1) --- -# Uses a temp config with 1:1 ratio -TEMP_1TO1="$RESULTS_DIR/_sat4_1to1.yaml" -cat > "$TEMP_1TO1" <<'EOF' -from_template: - path: test_suites/integration/continuous/saturation-cores-template.yaml.j2 - variables: - num_cores: 4 - engine_core_range: "0-3" - loadgen_cores: 4 - loadgen_core_range: "4-7" - backend_cores: 4 - backend_core_range: "12-15" - max_batch_size: 512 -EOF - -echo "--- [4/5] 4-core SUT, 1:1 ratio (4 LG cores, 1 container) ---" -for run in $(seq 1 $RUNS_PER_CONFIG); do - run_test "4core-1to1" "$TEMP_1TO1" "$run" -done -echo "" - -# --- Config 5: 4 SUT cores, 2 LG containers × 2 cores (2:1, 2 containers) --- -echo "--- [5/5] 4-core SUT, 2:1 ratio (2×2 LG cores, 2 containers) ---" -for run in $(seq 1 $RUNS_PER_CONFIG); do - run_test "4core-2lg" \ - "./test_suites/integration/continuous/saturation-4cores-2lg.yaml" \ - "$run" -done -echo "" - -# ============================================================================= -# Summary -# ============================================================================= -echo "=================================================================" -echo " SUMMARY" -echo "=================================================================" -echo "" -printf "%-20s %8s %12s %8s %8s\n" "Config" "CPU%" "Logs/sec" "Idle" "Hot" -echo "--------------------------------------------------------------" - -# Compute averages per config -for label in "1core-2to1" "2core-2to1" "4core-2to1" "4core-1to1" "4core-2lg"; do - avg_cpu=$(grep "^$label|" "$RESULTS_DIR/all_results.csv" | awk -F'|' '{s+=$3; n++} END{if(n>0) printf "%.1f", s/n; else print "n/a"}') - avg_tput=$(grep "^$label|" "$RESULTS_DIR/all_results.csv" | awk -F'|' '{s+=$4; n++} END{if(n>0) printf "%.0f", s/n; else print "n/a"}') - avg_zero=$(grep "^$label|" "$RESULTS_DIR/all_results.csv" | awk -F'|' '{s+=$6; n++} END{if(n>0) printf "%.1f", s/n; else print "n/a"}') - avg_hot=$(grep "^$label|" "$RESULTS_DIR/all_results.csv" | awk -F'|' '{s+=$7; n++} END{if(n>0) printf "%.1f", s/n; else print "n/a"}') - printf "%-20s %8s %12s %8s %8s\n" "$label" "$avg_cpu" "$avg_tput" "$avg_zero" "$avg_hot" -done - -echo "" -echo "Columns:" -echo " CPU% = Normalized avg CPU of SUT (100% = all cores saturated)" -echo " Logs/sec = Throughput through the SUT" -echo " Idle = Avg # of engine cores at 0% (SO_REUSEPORT imbalance)" -echo " Hot = Avg # of engine cores at >90%" -echo "" -echo "Key observations:" -echo " - 1-core and 2-core: >95% CPU, properly saturated with 2:1 ratio" -echo " - 4-core 2:1: ~75% CPU, 2 hot + 2 cold cores (reuseport hash skew)" -echo " - 4-core 1:1: ~50% CPU, 1 core gets 0 connections" -echo " - 4-core 2-container: same as 1:1, Docker bridge IPs don't help" -echo "" -echo "Conclusion: 2:1 ratio with single container is the best practical" -echo "approach. The ~75% ceiling at 4+ cores is an SO_REUSEPORT limitation" -echo "that needs engine-level fixes (eBPF rebalancing or topic-based split)." -echo "" -echo "Raw results: $RESULTS_DIR/all_results.csv" -echo "Per-run logs: $RESULTS_DIR/*.log" - -# Cleanup temp config -rm -f "$TEMP_1TO1" diff --git a/tools/pipeline_perf_test/test_suites/integration/continuous/saturation-2lg-template.yaml.j2 b/tools/pipeline_perf_test/test_suites/integration/continuous/saturation-2lg-template.yaml.j2 deleted file mode 100644 index 82d981a80d..0000000000 --- a/tools/pipeline_perf_test/test_suites/integration/continuous/saturation-2lg-template.yaml.j2 +++ /dev/null @@ -1,104 +0,0 @@ -# Template for core scaling saturation tests with 2 separate loadgen containers. -# Using separate containers gives different source IPs, improving SO_REUSEPORT -# hash entropy and achieving better load distribution across engine cores. -name: Continuous - Saturation - {{num_cores}} Core(s) (2-LG) -components: - load-generator-1: - deployment: - docker: - image: df_engine:latest - network: testbed - ports: - - "8085:8080" - volumes: - - 'test_suites/integration/configs/loadgen/config.rendered.yaml:/home/dataflow/config.yaml:ro' - command: - - "--config" - - "./config.yaml" - - "--core-id-range" - - "{{loadgen1_core_range}}" - - "--http-admin-bind" - - "0.0.0.0:8080" - monitoring: - docker_component: - allocated_cores: {{loadgen1_cores}} - prometheus: - endpoint: http://localhost:8085/api/v1/telemetry/metrics?format=prometheus&reset=false - load-generator-2: - deployment: - docker: - image: df_engine:latest - network: testbed - ports: - - "8095:8080" - volumes: - - 'test_suites/integration/configs/loadgen/config2.rendered.yaml:/home/dataflow/config.yaml:ro' - command: - - "--config" - - "./config.yaml" - - "--core-id-range" - - "{{loadgen2_core_range}}" - - "--http-admin-bind" - - "0.0.0.0:8080" - monitoring: - docker_component: - allocated_cores: {{loadgen2_cores}} - prometheus: - endpoint: http://localhost:8095/api/v1/telemetry/metrics?format=prometheus&reset=false - df-engine: - deployment: - docker: - image: df_engine:latest - network: testbed - ports: - - "8086:8080" - volumes: - - 'test_suites/integration/configs/engine/config.rendered.yaml:/home/dataflow/config.yaml:ro' - command: - - "--config" - - "./config.yaml" - - "--core-id-range" - - "{{engine_core_range}}" - - "--http-admin-bind" - - "0.0.0.0:8080" - monitoring: - docker_component: - allocated_cores: {{num_cores}} - prometheus: - endpoint: http://localhost:8086/api/v1/telemetry/metrics?format=prometheus&reset=false - backend-service: - deployment: - docker: - image: df_engine:latest - network: testbed - ports: - - "8087:8080" - volumes: - - 'test_suites/integration/configs/backend/config.rendered.yaml:/home/dataflow/config.yaml:ro' - command: - - "--config" - - "./config.yaml" - - "--core-id-range" - - "{{backend_core_range}}" - - "--http-admin-bind" - - "0.0.0.0:8080" - monitoring: - docker_component: - allocated_cores: {{backend_cores}} - prometheus: - endpoint: http://localhost:8087/api/v1/telemetry/metrics?format=prometheus&reset=false - -tests: - - name: OTLP-ATTR-OTLP - from_template: - path: test_suites/integration/templates/test_steps/df-2loadgen-steps-docker.yaml - variables: - result_dir: "continuous_saturation_{{num_cores}}core_2lg_otlp" - engine_config_template: test_suites/integration/templates/configs/engine/backpressure/otlp-attr-otlp.yaml - loadgen_exporter_type: otlp - backend_receiver_type: otlp - observation_interval: 60 - signals_per_second: null - max_batch_size: {{max_batch_size}} - data_source: static - log_body_size_bytes: 1024 diff --git a/tools/pipeline_perf_test/test_suites/integration/continuous/saturation-4cores-2lg.yaml b/tools/pipeline_perf_test/test_suites/integration/continuous/saturation-4cores-2lg.yaml deleted file mode 100644 index ca373217b3..0000000000 --- a/tools/pipeline_perf_test/test_suites/integration/continuous/saturation-4cores-2lg.yaml +++ /dev/null @@ -1,19 +0,0 @@ -# Saturation test: 4-core engine with 2 separate loadgen containers. -# Two containers = different source IPs = better SO_REUSEPORT distribution. -# Layout (16 cores total): -# Engine: cores 0-3 (4 cores) -# Loadgen 1: cores 4-5 (2 cores) -# Loadgen 2: cores 6-7 (2 cores) -# Backend: cores 12-15 (4 cores) -from_template: - path: test_suites/integration/continuous/saturation-2lg-template.yaml.j2 - variables: - num_cores: 4 - engine_core_range: "0-3" - loadgen1_cores: 2 - loadgen1_core_range: "4-5" - loadgen2_cores: 2 - loadgen2_core_range: "6-7" - backend_cores: 4 - backend_core_range: "12-15" - max_batch_size: 512 diff --git a/tools/pipeline_perf_test/test_suites/integration/templates/test_steps/df-2loadgen-steps-docker.yaml b/tools/pipeline_perf_test/test_suites/integration/templates/test_steps/df-2loadgen-steps-docker.yaml deleted file mode 100644 index 4a0b9a2ad1..0000000000 --- a/tools/pipeline_perf_test/test_suites/integration/templates/test_steps/df-2loadgen-steps-docker.yaml +++ /dev/null @@ -1,210 +0,0 @@ -# Test steps for dual load-generator setup. -# Two separate Docker containers (different source IPs) improve SO_REUSEPORT -# hash distribution across engine receiver cores. -steps: - - name: Deploy Backend Engine - action: - component_action: - phase: deploy - target: backend-service - hooks: - run: - pre: - - render_template: - template_path: './test_suites/integration/templates/configs/backend/config.yaml.j2' - output_path: ./test_suites/integration/configs/backend/config.rendered.yaml - variables: - backend_receiver_type: '{{backend_receiver_type}}' - post: - - ready_check_http: - url: http://localhost:8087/api/v1/telemetry/metrics?reset=false - method: GET - expected_status_code: 200 - - name: Deploy Dataflow Engine - action: - component_action: - phase: deploy - target: df-engine - hooks: - run: - pre: - - render_template: - template_path: '{{engine_config_template}}' - output_path: ./test_suites/integration/configs/engine/config.rendered.yaml - variables: - backend_hostname: backend-service - post: - - ready_check_http: - url: http://localhost:8086/api/v1/telemetry/metrics?reset=false - method: GET - expected_status_code: 200 - - name: Deploy Load Generator 1 - action: - component_action: - phase: deploy - target: load-generator-1 - hooks: - run: - pre: - - render_template: - template_path: './test_suites/integration/templates/configs/loadgen/config.yaml.j2' - output_path: ./test_suites/integration/configs/loadgen/config.rendered.yaml - variables: - max_batch_size: {{max_batch_size | default(1000)}} - signals_per_second: {% if signals_per_second is none %}null{% else %}{{signals_per_second | default(100000)}}{% endif %} - metric_weight: {{metric_weight | default(0)}} - trace_weight: {{trace_weight | default(0)}} - log_weight: {{log_weight | default(100)}} - engine_hostname: df-engine - loadgen_exporter_type: '{{loadgen_exporter_type}}' - data_source: {{data_source | default("semantic_conventions")}} - {% if log_body_size_bytes is defined %}log_body_size_bytes: {{log_body_size_bytes}}{% endif %} - post: - - ready_check_http: - url: http://localhost:8085/api/v1/telemetry/metrics?reset=false - method: GET - expected_status_code: 200 - - name: Deploy Load Generator 2 - action: - component_action: - phase: deploy - target: load-generator-2 - hooks: - run: - pre: - # Render a second copy of the loadgen config (identical settings, different file) - - render_template: - template_path: './test_suites/integration/templates/configs/loadgen/config.yaml.j2' - output_path: ./test_suites/integration/configs/loadgen/config2.rendered.yaml - variables: - max_batch_size: {{max_batch_size | default(1000)}} - signals_per_second: {% if signals_per_second is none %}null{% else %}{{signals_per_second | default(100000)}}{% endif %} - metric_weight: {{metric_weight | default(0)}} - trace_weight: {{trace_weight | default(0)}} - log_weight: {{log_weight | default(100)}} - engine_hostname: df-engine - loadgen_exporter_type: '{{loadgen_exporter_type}}' - data_source: {{data_source | default("semantic_conventions")}} - {% if log_body_size_bytes is defined %}log_body_size_bytes: {{log_body_size_bytes}}{% endif %} - post: - - ready_check_http: - url: http://localhost:8095/api/v1/telemetry/metrics?reset=false - method: GET - expected_status_code: 200 - - name: Monitor All - action: - multi_component_action: - phase: start_monitoring - targets: - - backend-service - - load-generator-1 - - load-generator-2 - - df-engine - - name: Wait for data - action: - wait: - delay_seconds: {{wait_for_data_interval | default(5)}} - - name: Observe Load - action: - wait: - delay_seconds: {{observation_interval | default(20)}} - hooks: - run: - pre: - - record_event: - name: observation_start - post: - - record_event: - name: observation_stop - - name: Stop Load Generator 1 - hooks: - run: - pre: - - send_http_request: - url: http://localhost:8085/api/v1/pipeline-groups/shutdown?wait=true&timeout_secs={{drain_timeout_secs | default(60)}} - method: POST - headers: - "Content-Type": "application/json" - timeout: {{drain_timeout_secs | default(60) | int + 5}} - action: - no_op: {} - - name: Stop Load Generator 2 - hooks: - run: - pre: - - send_http_request: - url: http://localhost:8095/api/v1/pipeline-groups/shutdown?wait=true&timeout_secs={{drain_timeout_secs | default(60)}} - method: POST - headers: - "Content-Type": "application/json" - timeout: {{drain_timeout_secs | default(60) | int + 5}} - action: - no_op: {} - - name: Stop Engine - hooks: - run: - pre: - - send_http_request: - url: http://localhost:8086/api/v1/pipeline-groups/shutdown?wait=true&timeout_secs={{drain_timeout_secs | default(60)}} - method: POST - headers: - "Content-Type": "application/json" - timeout: {{drain_timeout_secs | default(60) | int + 5}} - action: - no_op: {} - - name: Stop Backend - hooks: - run: - pre: - - send_http_request: - url: http://localhost:8087/api/v1/pipeline-groups/shutdown?wait=true&timeout_secs={{drain_timeout_secs | default(60)}} - method: POST - headers: - "Content-Type": "application/json" - timeout: {{drain_timeout_secs | default(60) | int + 5}} - action: - no_op: {} - - name: Wait For Metrics Update - action: - wait: - delay_seconds: 1 - - name: Stop Monitoring All - action: - multi_component_action: - phase: stop_monitoring - targets: - - backend-service - - load-generator-1 - - load-generator-2 - - df-engine - - name: Destroy All - action: - multi_component_action: - phase: destroy - targets: - - load-generator-1 - - load-generator-2 - - df-engine - - backend-service - - name: Run Report - action: - wait: - delay_seconds: 0 - hooks: - run: - post: - - print_container_logs: {} - - sql_report: - name: Integration Report - Logs - report_config_file: {{report_config | default('./test_suites/integration/configs/integration_report_logs.yaml')}} - output: - - format: - template: {} - destination: - console: {} - - format: - template: - path: ./test_suites/integration/templates/reports/gh-action-sqlreport.j2 - destination: - file: - directory: results/{{result_dir | default('integration')}}/gh-actions-benchmark From 8f5d06aa9f8ba3caf7264214145aa255e96928f4 Mon Sep 17 00:00:00 2001 From: Cithomas Date: Tue, 12 May 2026 14:44:49 +0000 Subject: [PATCH 4/5] fix: remove unused Duration import in geneva_exporter --- .../crates/contrib-nodes/src/exporters/geneva_exporter/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/geneva_exporter/mod.rs b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/geneva_exporter/mod.rs index 327483de0d..bfa929ca44 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/geneva_exporter/mod.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/geneva_exporter/mod.rs @@ -52,7 +52,7 @@ use otap_df_telemetry_macros::metric_set; use serde::Deserialize; use std::path::PathBuf; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; // Geneva uploader dependencies use futures::StreamExt; From d1397d8f13bcb03192ae3715e6515e34b6825ee9 Mon Sep 17 00:00:00 2001 From: Cithomas Date: Tue, 12 May 2026 21:19:32 +0000 Subject: [PATCH 5/5] Remove start_periodic_telemetry from host_metrics_receiver This node was added while the PR was in flight and still called start_periodic_telemetry(1s) directly, bypassing the centralized timer registration. --- .../core-nodes/src/receivers/host_metrics_receiver/mod.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/rust/otap-dataflow/crates/core-nodes/src/receivers/host_metrics_receiver/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/receivers/host_metrics_receiver/mod.rs index 8ba632865b..7b75424f67 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/receivers/host_metrics_receiver/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/receivers/host_metrics_receiver/mod.rs @@ -481,10 +481,6 @@ impl local::Receiver for HostMetricsReceiver { })?; let mut scheduler = FamilyScheduler::new(&config, Instant::now()); - let _ = effect_handler - .start_periodic_telemetry(Duration::from_secs(1)) - .await?; - loop { tokio::select! { biased;