Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rust/otap-dataflow/crates/controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1182,7 +1182,7 @@ impl<
.logs
.providers
.uses_console_async_provider()
.then(|| obs_state_store.reporter(engine.observed_state.logging_events));
.then(|| obs_state_store.reporter(engine.observed_state.logging_events.clone()));

// Create the telemetry system. The console_async_reporter is passed when any
// providers use ConsoleAsync. The its_logs_receiver is passed when any
Expand All @@ -1191,6 +1191,7 @@ impl<
telemetry_config,
telemetry_registry.clone(),
console_async_reporter,
engine.observed_state.logging_events.clone(),
engine_context,
log_tap_handle.clone(),
)?;
Expand Down
9 changes: 7 additions & 2 deletions rust/otap-dataflow/crates/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ impl InternalTelemetrySystem {
config: &TelemetryConfig,
telemetry_registry: TelemetryRegistryHandle,
console_async_reporter: Option<ObservedEventReporter>,
logging_send_policy: SendPolicy,
context_fn: LogContextFn,
log_tap_handle: Option<log_tap::InternalLogTapHandle>,
) -> Result<Self, Error> {
Expand Down Expand Up @@ -261,10 +262,10 @@ impl InternalTelemetrySystem {
let (its_reporter, its_settings) = if config.logs.providers.uses_its_provider() {
let (sender, logs_receiver) = flume::bounded(config.reporting_channel_size);
let reporter = if let Some(log_tap) = &log_tap_handle {
ObservedEventReporter::new(SendPolicy::default(), sender)
ObservedEventReporter::new(logging_send_policy.clone(), sender)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - Could we add one regression test for this path with ProviderMode::ITS and console_fallback: false?

.with_drop_counter(log_tap.ingest_drop_counter())
} else {
ObservedEventReporter::new(SendPolicy::default(), sender)
ObservedEventReporter::new(logging_send_policy.clone(), sender)
};
let resource_bytes = otel_sdk::encode_resource_bytes(&config.resource);
(
Expand Down Expand Up @@ -423,6 +424,7 @@ impl Default for InternalTelemetrySystem {
&config,
TelemetryRegistryHandle::new(),
Some(dummy_reporter),
SendPolicy::default(),
LogContext::new,
None,
)
Expand Down Expand Up @@ -468,6 +470,7 @@ mod tests {
&TelemetryConfig::default(),
TelemetryRegistryHandle::new(),
Some(test_reporter()),
SendPolicy::default(),
LogContext::new,
None,
)
Expand All @@ -488,6 +491,7 @@ mod tests {
&config_with_providers(providers),
TelemetryRegistryHandle::new(),
Some(test_reporter()),
SendPolicy::default(),
LogContext::new,
None,
)
Expand Down Expand Up @@ -529,6 +533,7 @@ mod tests {
&config,
TelemetryRegistryHandle::new(),
Some(test_reporter()),
SendPolicy::default(),
LogContext::new,
None,
)
Expand Down
Loading