From c730560d57d2c6270e37dad7f26476f0b7f6bc98 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Fri, 8 May 2026 16:42:25 +0000 Subject: [PATCH 1/4] telemetry: thread engine.observed_state.logging_events policy to ITS reporter The InternalTelemetrySystem ITS reporter was hard-coded to use SendPolicy::default(), which has console_fallback: true. Under load this can produce hundreds of thousands of 'Channel full, dropping observed event' raw_error! lines on stderr per second when the ITS flume channel saturates, which: - drowns out useful diagnostic output - silently dominates /proc//io wchar so any wire-bytes measurement based on that counter is contaminated - costs measurable CPU writing those messages The engine config already has the right knob, engine.observed_state.logging_events, with default console_fallback: false for the logging path (only the engine_events path defaults to true). It just was not being plumbed to the ITS reporter. Add an its_reporter_policy: SendPolicy parameter to InternalTelemetrySystem::new and use it for both reporter constructions in the ITS branch (with and without log tap). Pass engine.observed_state.logging_events.clone() from the controller startup. All test call sites updated to pass SendPolicy::default() to preserve their existing behaviour. Verified with cargo test -p otap-df-telemetry --lib (126 passed). Verified end-to-end: a logger configured with global: its at 10k/s and a downstream that backpressures (forcing channel-full drops at ~50%) now produces 0 'Channel full' lines vs the prior 150k lines in 30s, and process CPU drops correspondingly. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- rust/otap-dataflow/crates/controller/src/lib.rs | 3 ++- rust/otap-dataflow/crates/telemetry/src/lib.rs | 9 +++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/rust/otap-dataflow/crates/controller/src/lib.rs b/rust/otap-dataflow/crates/controller/src/lib.rs index c5b9d42168..247be7d6af 100644 --- a/rust/otap-dataflow/crates/controller/src/lib.rs +++ b/rust/otap-dataflow/crates/controller/src/lib.rs @@ -1182,7 +1182,7 @@ impl< .logs .providers .uses_console_async_provider() - .then(|| obs_state_store.reporter(engine.observed_state.logging_events)); + .then(|| obs_state_store.reporter(engine.observed_state.logging_events.clone())); // Create the telemetry system. The console_async_reporter is passed when any // providers use ConsoleAsync. The its_logs_receiver is passed when any @@ -1191,6 +1191,7 @@ impl< telemetry_config, telemetry_registry.clone(), console_async_reporter, + engine.observed_state.logging_events.clone(), engine_context, log_tap_handle.clone(), )?; diff --git a/rust/otap-dataflow/crates/telemetry/src/lib.rs b/rust/otap-dataflow/crates/telemetry/src/lib.rs index ecb7b93143..0f157598f9 100644 --- a/rust/otap-dataflow/crates/telemetry/src/lib.rs +++ b/rust/otap-dataflow/crates/telemetry/src/lib.rs @@ -234,6 +234,7 @@ impl InternalTelemetrySystem { config: &TelemetryConfig, telemetry_registry: TelemetryRegistryHandle, console_async_reporter: Option, + its_reporter_policy: SendPolicy, context_fn: LogContextFn, log_tap_handle: Option, ) -> Result { @@ -261,10 +262,10 @@ impl InternalTelemetrySystem { let (its_reporter, its_settings) = if config.logs.providers.uses_its_provider() { let (sender, logs_receiver) = flume::bounded(config.reporting_channel_size); let reporter = if let Some(log_tap) = &log_tap_handle { - ObservedEventReporter::new(SendPolicy::default(), sender) + ObservedEventReporter::new(its_reporter_policy.clone(), sender) .with_drop_counter(log_tap.ingest_drop_counter()) } else { - ObservedEventReporter::new(SendPolicy::default(), sender) + ObservedEventReporter::new(its_reporter_policy.clone(), sender) }; let resource_bytes = otel_sdk::encode_resource_bytes(&config.resource); ( @@ -423,6 +424,7 @@ impl Default for InternalTelemetrySystem { &config, TelemetryRegistryHandle::new(), Some(dummy_reporter), + SendPolicy::default(), LogContext::new, None, ) @@ -468,6 +470,7 @@ mod tests { &TelemetryConfig::default(), TelemetryRegistryHandle::new(), Some(test_reporter()), + SendPolicy::default(), LogContext::new, None, ) @@ -488,6 +491,7 @@ mod tests { &config_with_providers(providers), TelemetryRegistryHandle::new(), Some(test_reporter()), + SendPolicy::default(), LogContext::new, None, ) @@ -529,6 +533,7 @@ mod tests { &config, TelemetryRegistryHandle::new(), Some(test_reporter()), + SendPolicy::default(), LogContext::new, None, ) From 2df990ed5f099b15e9218e912bff14f1c94fb1d8 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Sat, 9 May 2026 00:54:21 +0000 Subject: [PATCH 2/4] yes From 1f142c80de9aeb3a47b349856aac29731ecb4dfd Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Sat, 9 May 2026 01:16:46 +0000 Subject: [PATCH 3/4] edit --- rust/otap-dataflow/crates/telemetry/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/otap-dataflow/crates/telemetry/src/lib.rs b/rust/otap-dataflow/crates/telemetry/src/lib.rs index 0f157598f9..06838bba72 100644 --- a/rust/otap-dataflow/crates/telemetry/src/lib.rs +++ b/rust/otap-dataflow/crates/telemetry/src/lib.rs @@ -234,7 +234,7 @@ impl InternalTelemetrySystem { config: &TelemetryConfig, telemetry_registry: TelemetryRegistryHandle, console_async_reporter: Option, - its_reporter_policy: SendPolicy, + logging_send_policy: SendPolicy, context_fn: LogContextFn, log_tap_handle: Option, ) -> Result { @@ -262,10 +262,10 @@ impl InternalTelemetrySystem { let (its_reporter, its_settings) = if config.logs.providers.uses_its_provider() { let (sender, logs_receiver) = flume::bounded(config.reporting_channel_size); let reporter = if let Some(log_tap) = &log_tap_handle { - ObservedEventReporter::new(its_reporter_policy.clone(), sender) + ObservedEventReporter::new(logging_send_policy.clone(), sender) .with_drop_counter(log_tap.ingest_drop_counter()) } else { - ObservedEventReporter::new(its_reporter_policy.clone(), sender) + ObservedEventReporter::new(logging_send_policy.clone(), sender) }; let resource_bytes = otel_sdk::encode_resource_bytes(&config.resource); ( From 93537fc1ea1b866d7e0c85767fa8ab804240dac1 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 12 May 2026 08:36:15 -0700 Subject: [PATCH 4/4] telemetry: add test that ITS reporter honors configured logging send policy Covers both code paths in InternalTelemetrySystem::new where the ITS reporter is constructed (with and without an InternalLogTapHandle), verifying that the configured engine.observed_state.logging_events SendPolicy is threaded through rather than replaced with SendPolicy::default(). Adds test-only accessors: - ObservedEventReporter::policy() - InternalTelemetrySystem::its_reporter() Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../crates/telemetry/src/event.rs | 7 ++ .../otap-dataflow/crates/telemetry/src/lib.rs | 78 ++++++++++++++++++- 2 files changed, 84 insertions(+), 1 deletion(-) diff --git a/rust/otap-dataflow/crates/telemetry/src/event.rs b/rust/otap-dataflow/crates/telemetry/src/event.rs index 420199766b..62dd4183c7 100644 --- a/rust/otap-dataflow/crates/telemetry/src/event.rs +++ b/rust/otap-dataflow/crates/telemetry/src/event.rs @@ -68,6 +68,13 @@ impl ObservedEventReporter { self } + /// Returns the configured send policy. Test-only accessor used to verify + /// that the policy was threaded through correctly during construction. + #[cfg(test)] + pub(crate) fn policy(&self) -> &SendPolicy { + &self.policy + } + /// Report an engine event. /// /// When a dedicated engine sender is configured, the event is delivered diff --git a/rust/otap-dataflow/crates/telemetry/src/lib.rs b/rust/otap-dataflow/crates/telemetry/src/lib.rs index 06838bba72..cdf63e9727 100644 --- a/rust/otap-dataflow/crates/telemetry/src/lib.rs +++ b/rust/otap-dataflow/crates/telemetry/src/lib.rs @@ -370,6 +370,13 @@ impl InternalTelemetrySystem { self.log_tap_handle.clone() } + /// Test-only accessor for the ITS reporter, used to verify that the + /// configured logging send policy is threaded through during construction. + #[cfg(test)] + pub(crate) fn its_reporter(&self) -> Option<&ObservedEventReporter> { + self.its_reporter.as_ref() + } + /// Returns the configured log level. #[must_use] pub const fn log_level(&self) -> &LogLevel { @@ -438,7 +445,9 @@ mod tests { use otap_df_config::pipeline::telemetry::{ AttributeValue::I64 as OTelI64, AttributeValue::String as OTelString, }; - use otap_df_config::settings::telemetry::logs::{LoggingProviders, LogsConfig, ProviderMode}; + use otap_df_config::settings::telemetry::logs::{ + InternalLogTapConfig, LoggingProviders, LogsConfig, ProviderMode, + }; use otap_df_pdata::proto::OtlpProtoMessage; use otap_df_pdata::proto::opentelemetry::common::v1::{AnyValue, KeyValue}; use otap_df_pdata::proto::opentelemetry::logs::{v1::LogsData, v1::ResourceLogs}; @@ -517,6 +526,73 @@ mod tests { }); } + #[test] + fn its_reporter_honors_configured_logging_send_policy() { + // Verifies the fix that threads `engine.observed_state.logging_events` + // through to the ITS reporter, covering both code paths: + // 1. log_tap_handle: None + // 2. log_tap_handle: Some(_) (sets the drop counter on the reporter) + // + // Before the fix, both paths constructed the ITS reporter with + // `SendPolicy::default()` (which has `console_fallback: true`), + // so a user-provided `console_fallback: false` was silently ignored. + with_cleared_rust_log(|| { + let providers = LoggingProviders { + global: ProviderMode::Noop, + engine: ProviderMode::ITS, + internal: ProviderMode::Noop, + admin: ProviderMode::Noop, + }; + let custom_policy = SendPolicy { + blocking_timeout: Some(Duration::from_millis(7)), + console_fallback: false, + }; + + // Case 1: no log tap handle. + let its = InternalTelemetrySystem::new( + &config_with_providers(providers.clone()), + TelemetryRegistryHandle::new(), + Some(test_reporter()), + custom_policy.clone(), + LogContext::new, + None, + ) + .expect("should create"); + let policy = its + .its_reporter() + .expect("ITS provider configured") + .policy(); + assert_eq!( + policy, &custom_policy, + "ITS reporter (no log tap) must use configured logging_events policy" + ); + + // Case 2: with a log tap handle (reporter additionally gets the drop counter). + let log_tap = log_tap::build(&InternalLogTapConfig { + enabled: true, + max_entries: 1, + max_bytes: usize::MAX, + }); + let its = InternalTelemetrySystem::new( + &config_with_providers(providers), + TelemetryRegistryHandle::new(), + Some(test_reporter()), + custom_policy.clone(), + LogContext::new, + Some(log_tap), + ) + .expect("should create"); + let policy = its + .its_reporter() + .expect("ITS provider configured") + .policy(); + assert_eq!( + policy, &custom_policy, + "ITS reporter (with log tap) must use configured logging_events policy" + ); + }); + } + #[test] fn resource_bytes() { let mut config = TelemetryConfig::default();