diff --git a/rust/otap-dataflow/crates/controller/src/lib.rs b/rust/otap-dataflow/crates/controller/src/lib.rs index c5b9d42168..247be7d6af 100644 --- a/rust/otap-dataflow/crates/controller/src/lib.rs +++ b/rust/otap-dataflow/crates/controller/src/lib.rs @@ -1182,7 +1182,7 @@ impl< .logs .providers .uses_console_async_provider() - .then(|| obs_state_store.reporter(engine.observed_state.logging_events)); + .then(|| obs_state_store.reporter(engine.observed_state.logging_events.clone())); // Create the telemetry system. The console_async_reporter is passed when any // providers use ConsoleAsync. The its_logs_receiver is passed when any @@ -1191,6 +1191,7 @@ impl< telemetry_config, telemetry_registry.clone(), console_async_reporter, + engine.observed_state.logging_events.clone(), engine_context, log_tap_handle.clone(), )?; diff --git a/rust/otap-dataflow/crates/telemetry/src/lib.rs b/rust/otap-dataflow/crates/telemetry/src/lib.rs index ecb7b93143..06838bba72 100644 --- a/rust/otap-dataflow/crates/telemetry/src/lib.rs +++ b/rust/otap-dataflow/crates/telemetry/src/lib.rs @@ -234,6 +234,7 @@ impl InternalTelemetrySystem { config: &TelemetryConfig, telemetry_registry: TelemetryRegistryHandle, console_async_reporter: Option, + logging_send_policy: SendPolicy, context_fn: LogContextFn, log_tap_handle: Option, ) -> Result { @@ -261,10 +262,10 @@ impl InternalTelemetrySystem { let (its_reporter, its_settings) = if config.logs.providers.uses_its_provider() { let (sender, logs_receiver) = flume::bounded(config.reporting_channel_size); let reporter = if let Some(log_tap) = &log_tap_handle { - ObservedEventReporter::new(SendPolicy::default(), sender) + ObservedEventReporter::new(logging_send_policy.clone(), sender) .with_drop_counter(log_tap.ingest_drop_counter()) } else { - ObservedEventReporter::new(SendPolicy::default(), sender) + ObservedEventReporter::new(logging_send_policy.clone(), sender) }; let resource_bytes = otel_sdk::encode_resource_bytes(&config.resource); ( @@ -423,6 +424,7 @@ impl Default for InternalTelemetrySystem { &config, TelemetryRegistryHandle::new(), Some(dummy_reporter), + SendPolicy::default(), LogContext::new, None, ) @@ -468,6 +470,7 @@ mod tests { &TelemetryConfig::default(), TelemetryRegistryHandle::new(), Some(test_reporter()), + SendPolicy::default(), LogContext::new, None, ) @@ -488,6 +491,7 @@ mod tests { &config_with_providers(providers), TelemetryRegistryHandle::new(), Some(test_reporter()), + SendPolicy::default(), LogContext::new, None, ) @@ -529,6 +533,7 @@ mod tests { &config, TelemetryRegistryHandle::new(), Some(test_reporter()), + SendPolicy::default(), LogContext::new, None, )