From e72fcaf92953e12a5e8f68226155f26f78357c79 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Fri, 17 Oct 2025 12:50:49 -0400 Subject: [PATCH 01/13] feat(internal_logs source): turn ON rate limiting by default --- Cargo.lock | 47 +++++++++++++++++++++++++++++--- lib/tracing-limit/Cargo.toml | 1 + lib/tracing-limit/src/lib.rs | 52 +++++++++++++++++++++++------------- 3 files changed, 78 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index faeb44eef1e6d..82386342d1618 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4875,7 +4875,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.10", "tokio", "tower-service", "tracing 0.1.41", @@ -8061,7 +8061,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck 0.5.0", - "itertools 0.11.0", + "itertools 0.14.0", "log", "multimap", "once_cell", @@ -8107,7 +8107,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.14.0", "proc-macro2 1.0.101", "quote 1.0.40", "syn 2.0.106", @@ -9439,6 +9439,15 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "scc" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46e6f046b7fef48e2660c57ed794263155d713de679057f2d0c169bfc6e756cc" +dependencies = [ + "sdd", +] + [[package]] name = "schannel" version = "0.1.27" @@ -9494,6 +9503,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "sdd" +version = "3.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca" + [[package]] name = "seahash" version = "4.1.0" @@ -9803,6 +9818,31 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "serial_test" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b258109f244e1d6891bf1053a55d63a5cd4f8f4c30cf9a1280989f80e7a1fa9" +dependencies = [ + "futures 0.3.31", + "log", + "once_cell", + "parking_lot 0.12.4", + "scc", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d69265a08751de7844521fd15003ae0a888e035773ba05695c5c759a6f89eef" +dependencies = [ + "proc-macro2 1.0.101", + "quote 1.0.40", + "syn 2.0.106", +] + [[package]] name = "sha-1" version = "0.10.1" @@ -11445,6 +11485,7 @@ dependencies = [ "criterion", "dashmap", "mock_instant", + "serial_test", "tracing 0.1.41", "tracing-core 0.1.33", "tracing-subscriber", diff --git a/lib/tracing-limit/Cargo.toml b/lib/tracing-limit/Cargo.toml index e6252ea3e6b45..5baef4a14f6f1 100644 --- a/lib/tracing-limit/Cargo.toml +++ b/lib/tracing-limit/Cargo.toml @@ -16,6 +16,7 @@ criterion = "0.7" tracing = "0.1.34" mock_instant = { version = "0.6" } tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] } +serial_test = { version = "3.2" } [[bench]] name = "limit" diff --git a/lib/tracing-limit/src/lib.rs b/lib/tracing-limit/src/lib.rs index 81bea3dc33485..b88531ecf6820 100644 --- a/lib/tracing-limit/src/lib.rs +++ b/lib/tracing-limit/src/lib.rs @@ -124,12 +124,12 @@ where } fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) { - // Visit the event, grabbing the limit status if one is defined. If we can't find a rate limit field, or the rate limit - // is set as false, then we let it pass through untouched. + // Visit the event, grabbing the limit status if one is defined. Rate limiting is ON by default + // unless explicitly disabled by setting `internal_log_rate_limit = false`. let mut limit_visitor = LimitVisitor::default(); event.record(&mut limit_visitor); - let limit_exists = limit_visitor.limit.unwrap_or(false); + let limit_exists = limit_visitor.limit.unwrap_or(true); if !limit_exists { return self.inner.on_event(event, ctx); } @@ -445,15 +445,14 @@ impl Visit for MessageVisitor { #[cfg(test)] mod test { use std::{ - sync::{Arc, LazyLock, Mutex}, + sync::{Arc, Mutex}, time::Duration, }; use mock_instant::global::MockClock; + use serial_test::serial; use tracing_subscriber::layer::SubscriberExt; - static TRACING_DEFAULT_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); - use super::*; #[derive(Default)] @@ -495,9 +494,8 @@ mod test { } #[test] + #[serial] fn rate_limits() { - let _guard = TRACING_DEFAULT_LOCK.lock().unwrap(); - let events: Arc>> = Default::default(); let recorder = RecordingLayer::new(Arc::clone(&events)); @@ -505,7 +503,7 @@ mod test { .with(RateLimitedLayer::new(recorder).with_default_limit(1)); tracing::subscriber::with_default(sub, || { for _ in 0..21 { - info!(message = "Hello world!", internal_log_rate_limit = true); + info!(message = "Hello world!"); MockClock::advance(Duration::from_millis(100)); } }); @@ -530,9 +528,8 @@ mod test { } #[test] + #[serial] fn override_rate_limit_at_callsite() { - let _guard = TRACING_DEFAULT_LOCK.lock().unwrap(); - let events: Arc>> = Default::default(); let recorder = RecordingLayer::new(Arc::clone(&events)); @@ -542,7 +539,6 @@ mod test { for _ in 0..21 { info!( message = "Hello world!", - internal_log_rate_limit = true, internal_log_rate_secs = 1 ); MockClock::advance(Duration::from_millis(100)); @@ -569,9 +565,8 @@ mod test { } #[test] + #[serial] fn rate_limit_by_span_key() { - let _guard = TRACING_DEFAULT_LOCK.lock().unwrap(); - let events: Arc>> = Default::default(); let recorder = RecordingLayer::new(Arc::clone(&events)); @@ -585,8 +580,7 @@ mod test { info_span!("span", component_id = &key, vrl_position = &line_number); let _enter = span.enter(); info!( - message = format!("Hello {key} on line_number {line_number}!").as_str(), - internal_log_rate_limit = true + message = format!("Hello {key} on line_number {line_number}!").as_str() ); } } @@ -635,9 +629,8 @@ mod test { } #[test] + #[serial] fn rate_limit_by_event_key() { - let _guard = TRACING_DEFAULT_LOCK.lock().unwrap(); - let events: Arc>> = Default::default(); let recorder = RecordingLayer::new(Arc::clone(&events)); @@ -649,7 +642,6 @@ mod test { for line_number in &[1, 2] { info!( message = format!("Hello {key} on line_number {line_number}!").as_str(), - internal_log_rate_limit = true, component_id = &key, vrl_position = &line_number ); @@ -698,4 +690,26 @@ mod test { .collect::>() ); } + + #[test] + #[serial] + fn disabled_rate_limit() { + let events: Arc>> = Default::default(); + + let recorder = RecordingLayer::new(Arc::clone(&events)); + let sub = tracing_subscriber::registry::Registry::default() + .with(RateLimitedLayer::new(recorder).with_default_limit(1)); + tracing::subscriber::with_default(sub, || { + for _ in 0..21 { + info!(message = "Hello world!", internal_log_rate_limit = false); + MockClock::advance(Duration::from_millis(100)); + } + }); + + let events = events.lock().unwrap(); + + // All 21 events should be emitted since rate limiting is disabled + assert_eq!(events.len(), 21); + assert!(events.iter().all(|e| e == "Hello world!")); + } } From c2f275780b948c922d4d232a215566b58cc266c0 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Fri, 17 Oct 2025 12:52:07 -0400 Subject: [PATCH 02/13] make fmt --- lib/tracing-limit/src/lib.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lib/tracing-limit/src/lib.rs b/lib/tracing-limit/src/lib.rs index b88531ecf6820..ccb9f2030784f 100644 --- a/lib/tracing-limit/src/lib.rs +++ b/lib/tracing-limit/src/lib.rs @@ -537,10 +537,7 @@ mod test { .with(RateLimitedLayer::new(recorder).with_default_limit(100)); tracing::subscriber::with_default(sub, || { for _ in 0..21 { - info!( - message = "Hello world!", - internal_log_rate_secs = 1 - ); + info!(message = "Hello world!", internal_log_rate_secs = 1); MockClock::advance(Duration::from_millis(100)); } }); From 7c08da2d4582aee65d8ef23720f4134c156e2ef7 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Fri, 17 Oct 2025 13:53:20 -0400 Subject: [PATCH 03/13] wip --- lib/tracing-limit/src/lib.rs | 270 +++++++++++++++++++++++++++++------ src/cli.rs | 16 ++- 2 files changed, 239 insertions(+), 47 deletions(-) diff --git a/lib/tracing-limit/src/lib.rs b/lib/tracing-limit/src/lib.rs index ccb9f2030784f..63c127cbab21f 100644 --- a/lib/tracing-limit/src/lib.rs +++ b/lib/tracing-limit/src/lib.rs @@ -1,6 +1,6 @@ #![deny(warnings)] -use std::fmt; +use std::{collections::BTreeMap, fmt}; use dashmap::DashMap; use tracing_core::{ @@ -62,6 +62,19 @@ where } } + /// Sets the default rate limit window in seconds. + /// + /// This controls how long logs are suppressed before they can be emitted again. + /// Within each window: + /// - 1st occurrence: Emitted normally + /// - 2nd occurrence: Shows "suppressing" warning + /// - 3rd+ occurrences: Silent until window expires + /// + /// # Examples + /// ```ignore + /// RateLimitedLayer::new(layer) + /// .with_default_limit(10) // 10-second windows + /// ``` pub fn with_default_limit(mut self, internal_log_rate_limit: u64) -> Self { self.internal_log_rate_limit = internal_log_rate_limit; self @@ -350,14 +363,29 @@ impl From for TraceValue { struct RateLimitedSpanKeys { component_id: Option, vrl_position: Option, + other_fields: BTreeMap, } impl RateLimitedSpanKeys { fn record(&mut self, field: &Field, value: TraceValue) { - match field.name() { + let field_name = field.name(); + + // Skip internal rate limiting control fields + if field_name == RATE_LIMIT_FIELD + || field_name == RATE_LIMIT_SECS_FIELD + || field_name == MESSAGE_FIELD + { + return; + } + + // Track known semantic fields explicitly + match field_name { COMPONENT_ID_FIELD => self.component_id = Some(value), VRL_POSITION => self.vrl_position = Some(value), - _ => {} + // Everything else goes into the catch-all bucket + _ => { + self.other_fields.insert(field_name.to_string(), value); + } } } @@ -368,6 +396,10 @@ impl RateLimitedSpanKeys { if let Some(vrl_position) = &other.vrl_position { self.vrl_position = Some(vrl_position.clone()); } + // Merge other fields, with 'other' taking precedence + for (key, value) in &other.other_fields { + self.other_fields.insert(key.clone(), value.clone()); + } } } @@ -455,6 +487,61 @@ mod test { use super::*; + #[derive(Default)] + struct AllFieldsVisitor { + fields: BTreeMap, + } + + impl Visit for AllFieldsVisitor { + fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) { + self.fields + .insert(field.name().to_string(), format!("{value:?}")); + } + + fn record_str(&mut self, field: &Field, value: &str) { + self.fields + .insert(field.name().to_string(), value.to_string()); + } + + fn record_i64(&mut self, field: &Field, value: i64) { + self.fields + .insert(field.name().to_string(), value.to_string()); + } + + fn record_u64(&mut self, field: &Field, value: u64) { + self.fields + .insert(field.name().to_string(), value.to_string()); + } + + fn record_bool(&mut self, field: &Field, value: bool) { + self.fields + .insert(field.name().to_string(), value.to_string()); + } + } + + impl AllFieldsVisitor { + fn format(&self) -> String { + let mut parts: Vec = Vec::new(); + + // Always show message first if present + if let Some(msg) = self.fields.get("message") { + parts.push(msg.clone()); + } + + // Then show other fields in sorted order + for (key, value) in &self.fields { + if key != "message" + && key != "internal_log_rate_limit" + && key != "internal_log_rate_secs" + { + parts.push(format!("{key}={value}")); + } + } + + parts.join(" ") + } + } + #[derive(Default)] struct RecordingLayer { events: Arc>>, @@ -484,12 +571,45 @@ mod test { true } - fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) { - let mut visitor = MessageVisitor::default(); + fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) { + let mut visitor = AllFieldsVisitor::default(); event.record(&mut visitor); + // Also capture fields from span context + if let Some(span) = ctx.lookup_current() { + for span_ref in span.scope().from_root() { + let extensions = span_ref.extensions(); + if let Some(span_keys) = extensions.get::() { + // Add component_id + if let Some(TraceValue::String(ref s)) = span_keys.component_id { + visitor.fields.insert("component_id".to_string(), s.clone()); + } + // Add vrl_position + if let Some(val) = &span_keys.vrl_position { + let formatted = match val { + TraceValue::String(s) => s.clone(), + TraceValue::Int(i) => i.to_string(), + TraceValue::Uint(u) => u.to_string(), + TraceValue::Bool(b) => b.to_string(), + }; + visitor.fields.insert("vrl_position".to_string(), formatted); + } + // Add other_fields + for (key, value) in &span_keys.other_fields { + let formatted = match value { + TraceValue::String(s) => s.clone(), + TraceValue::Int(i) => i.to_string(), + TraceValue::Uint(u) => u.to_string(), + TraceValue::Bool(b) => b.to_string(), + }; + visitor.fields.insert(key.clone(), formatted); + } + } + } + } + let mut events = self.events.lock().unwrap(); - events.push(visitor.message.unwrap_or_default()); + events.push(visitor.format()); } } @@ -590,34 +710,34 @@ mod test { assert_eq!( *events, vec![ - "Hello foo on line_number 1!", - "Hello foo on line_number 2!", - "Hello bar on line_number 1!", - "Hello bar on line_number 2!", - "Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding.", - "Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding.", - "Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding.", - "Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding.", - "Internal log [Hello foo on line_number 1!] has been suppressed 9 times.", - "Hello foo on line_number 1!", - "Internal log [Hello foo on line_number 2!] has been suppressed 9 times.", - "Hello foo on line_number 2!", - "Internal log [Hello bar on line_number 1!] has been suppressed 9 times.", - "Hello bar on line_number 1!", - "Internal log [Hello bar on line_number 2!] has been suppressed 9 times.", - "Hello bar on line_number 2!", - "Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding.", - "Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding.", - "Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding.", - "Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding.", - "Internal log [Hello foo on line_number 1!] has been suppressed 9 times.", - "Hello foo on line_number 1!", - "Internal log [Hello foo on line_number 2!] has been suppressed 9 times.", - "Hello foo on line_number 2!", - "Internal log [Hello bar on line_number 1!] has been suppressed 9 times.", - "Hello bar on line_number 1!", - "Internal log [Hello bar on line_number 2!] has been suppressed 9 times.", - "Hello bar on line_number 2!", + "Hello foo on line_number 1! component_id=foo vrl_position=1", + "Hello foo on line_number 2! component_id=foo vrl_position=2", + "Hello bar on line_number 1! component_id=bar vrl_position=1", + "Hello bar on line_number 2! component_id=bar vrl_position=2", + "Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding. component_id=foo vrl_position=1", + "Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding. component_id=foo vrl_position=2", + "Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding. component_id=bar vrl_position=1", + "Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding. component_id=bar vrl_position=2", + "Internal log [Hello foo on line_number 1!] has been suppressed 9 times. component_id=foo vrl_position=1", + "Hello foo on line_number 1! component_id=foo vrl_position=1", + "Internal log [Hello foo on line_number 2!] has been suppressed 9 times. component_id=foo vrl_position=2", + "Hello foo on line_number 2! component_id=foo vrl_position=2", + "Internal log [Hello bar on line_number 1!] has been suppressed 9 times. component_id=bar vrl_position=1", + "Hello bar on line_number 1! component_id=bar vrl_position=1", + "Internal log [Hello bar on line_number 2!] has been suppressed 9 times. component_id=bar vrl_position=2", + "Hello bar on line_number 2! component_id=bar vrl_position=2", + "Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding. component_id=foo vrl_position=1", + "Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding. component_id=foo vrl_position=2", + "Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding. component_id=bar vrl_position=1", + "Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding. component_id=bar vrl_position=2", + "Internal log [Hello foo on line_number 1!] has been suppressed 9 times. component_id=foo vrl_position=1", + "Hello foo on line_number 1! component_id=foo vrl_position=1", + "Internal log [Hello foo on line_number 2!] has been suppressed 9 times. component_id=foo vrl_position=2", + "Hello foo on line_number 2! component_id=foo vrl_position=2", + "Internal log [Hello bar on line_number 1!] has been suppressed 9 times. component_id=bar vrl_position=1", + "Hello bar on line_number 1! component_id=bar vrl_position=1", + "Internal log [Hello bar on line_number 2!] has been suppressed 9 times. component_id=bar vrl_position=2", + "Hello bar on line_number 2! component_id=bar vrl_position=2", ] .into_iter() .map(std::borrow::ToOwned::to_owned) @@ -653,34 +773,34 @@ mod test { assert_eq!( *events, vec![ - "Hello foo on line_number 1!", - "Hello foo on line_number 2!", - "Hello bar on line_number 1!", - "Hello bar on line_number 2!", + "Hello foo on line_number 1! component_id=foo vrl_position=1", + "Hello foo on line_number 2! component_id=foo vrl_position=2", + "Hello bar on line_number 1! component_id=bar vrl_position=1", + "Hello bar on line_number 2! component_id=bar vrl_position=2", "Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding.", "Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding.", "Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding.", "Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding.", "Internal log [Hello foo on line_number 1!] has been suppressed 9 times.", - "Hello foo on line_number 1!", + "Hello foo on line_number 1! component_id=foo vrl_position=1", "Internal log [Hello foo on line_number 2!] has been suppressed 9 times.", - "Hello foo on line_number 2!", + "Hello foo on line_number 2! component_id=foo vrl_position=2", "Internal log [Hello bar on line_number 1!] has been suppressed 9 times.", - "Hello bar on line_number 1!", + "Hello bar on line_number 1! component_id=bar vrl_position=1", "Internal log [Hello bar on line_number 2!] has been suppressed 9 times.", - "Hello bar on line_number 2!", + "Hello bar on line_number 2! component_id=bar vrl_position=2", "Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding.", "Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding.", "Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding.", "Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding.", "Internal log [Hello foo on line_number 1!] has been suppressed 9 times.", - "Hello foo on line_number 1!", + "Hello foo on line_number 1! component_id=foo vrl_position=1", "Internal log [Hello foo on line_number 2!] has been suppressed 9 times.", - "Hello foo on line_number 2!", + "Hello foo on line_number 2! component_id=foo vrl_position=2", "Internal log [Hello bar on line_number 1!] has been suppressed 9 times.", - "Hello bar on line_number 1!", + "Hello bar on line_number 1! component_id=bar vrl_position=1", "Internal log [Hello bar on line_number 2!] has been suppressed 9 times.", - "Hello bar on line_number 2!", + "Hello bar on line_number 2! component_id=bar vrl_position=2", ] .into_iter() .map(std::borrow::ToOwned::to_owned) @@ -709,4 +829,62 @@ mod test { assert_eq!(events.len(), 21); assert!(events.iter().all(|e| e == "Hello world!")); } + + #[test] + #[serial] + fn rate_limit_by_other_fields() { + // This test demonstrates the fix for the bug where logs with different + // contextual fields (like fanout_id, output_id, etc.) were being incorrectly + // suppressed together. The fix uses a catch-all BTreeMap to ensure logs with + // different field values create separate rate limit buckets. + let events: Arc>> = Default::default(); + + let recorder = RecordingLayer::new(Arc::clone(&events)); + let sub = tracing_subscriber::registry::Registry::default() + .with(RateLimitedLayer::new(recorder).with_default_limit(1)); + tracing::subscriber::with_default(sub, || { + for _ in 0..21 { + // Same component_id but different fanout_id values should be rate limited independently + info!( + message = "Configuring outputs for source.", + component_id = "demo_logs_1", + fanout_id = "input_1" + ); + info!( + message = "Configuring outputs for source.", + component_id = "demo_logs_1", + fanout_id = "input_2" + ); + MockClock::advance(Duration::from_millis(100)); + } + }); + + let events = events.lock().unwrap(); + + // Each unique combination of fields should be rate limited independently + // So we should see both logs emit initially, then both show suppression warnings + // The output now clearly shows the component_id and fanout_id values + assert_eq!( + *events, + vec![ + "Configuring outputs for source. component_id=demo_logs_1 fanout_id=input_1", + "Configuring outputs for source. component_id=demo_logs_1 fanout_id=input_2", + "Internal log [Configuring outputs for source.] is being suppressed to avoid flooding.", + "Internal log [Configuring outputs for source.] is being suppressed to avoid flooding.", + "Internal log [Configuring outputs for source.] has been suppressed 9 times.", + "Configuring outputs for source. component_id=demo_logs_1 fanout_id=input_1", + "Internal log [Configuring outputs for source.] has been suppressed 9 times.", + "Configuring outputs for source. component_id=demo_logs_1 fanout_id=input_2", + "Internal log [Configuring outputs for source.] is being suppressed to avoid flooding.", + "Internal log [Configuring outputs for source.] is being suppressed to avoid flooding.", + "Internal log [Configuring outputs for source.] has been suppressed 9 times.", + "Configuring outputs for source. component_id=demo_logs_1 fanout_id=input_1", + "Internal log [Configuring outputs for source.] has been suppressed 9 times.", + "Configuring outputs for source. component_id=demo_logs_1 fanout_id=input_2", + ] + .into_iter() + .map(std::borrow::ToOwned::to_owned) + .collect::>() + ); + } } diff --git a/src/cli.rs b/src/cli.rs index 6fc5ede473849..73c9423b2265f 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -180,7 +180,21 @@ pub struct RootOpts { )] pub watch_config_poll_interval_seconds: NonZeroU64, - /// Set the internal log rate limit + /// Set the internal log rate limit in seconds. + /// + /// This controls the time window for rate limiting Vector's own internal logs. + /// Within each time window, the first occurrence of a log is emitted, the second + /// shows a suppression warning, and subsequent occurrences are silent until the + /// window expires. + /// + /// Logs are grouped by their location in the code and contextual fields (like + /// component_id, fanout_id, etc.), so different log instances are rate limited + /// independently. + /// + /// Examples: + /// - 1: Very verbose, logs can repeat every second + /// - 10 (default): Logs can repeat every 10 seconds + /// - 60: Less verbose, logs can repeat every minute #[arg( short, long, From 1ad0df96fed1384eda67ffe9b1a6cfda6c58ad85 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Fri, 17 Oct 2025 14:28:13 -0400 Subject: [PATCH 04/13] preserve master groupping behavior --- docs/DEVELOPING.md | 23 ++ lib/tracing-limit/src/lib.rs | 659 +++++++++++++++++++++++++---------- src/cli.rs | 16 +- 3 files changed, 492 insertions(+), 206 deletions(-) diff --git a/docs/DEVELOPING.md b/docs/DEVELOPING.md index ed459429ef6e6..f32965244c28f 100644 --- a/docs/DEVELOPING.md +++ b/docs/DEVELOPING.md @@ -14,6 +14,7 @@ - [Minimum Supported Rust Version](#minimum-supported-rust-version) - [Guidelines](#guidelines) - [Sink healthchecks](#sink-healthchecks) + - [Disabling internal log rate limiting](#disabling-internal-log-rate-limiting) - [Testing](#testing) - [Unit tests](#unit-tests) - [Integration tests](#integration-tests) @@ -328,6 +329,28 @@ that fall into a false negative circumstance. Our goal should be to minimize the likelihood of users needing to pull that lever while still making a good effort to detect common problems. +### Disabling internal log rate limiting + +Vector rate limits its own internal logs by default (10-second windows). During development, you may want to see all log occurrences. + +**Globally** (CLI flag or environment variable): + +```bash +vector --config vector.yaml -r 1 +# or +VECTOR_INTERNAL_LOG_RATE_LIMIT=1 vector --config vector.yaml +``` + +**Per log statement**: + +```rust +// Disable rate limiting for this log +warn!(message = "Error occurred.", %error, internal_log_rate_limit = false); + +// Override rate limit window to 1 second +info!(message = "Processing batch.", batch_size, internal_log_rate_secs = 1); +``` + ## Testing Testing is very important since Vector's primary design principle is reliability. diff --git a/lib/tracing-limit/src/lib.rs b/lib/tracing-limit/src/lib.rs index 63c127cbab21f..abb6601eb9aab 100644 --- a/lib/tracing-limit/src/lib.rs +++ b/lib/tracing-limit/src/lib.rs @@ -1,6 +1,110 @@ #![deny(warnings)] - -use std::{collections::BTreeMap, fmt}; +//! Rate limiting for tracing events. +//! +//! This crate provides a tracing-subscriber layer that rate limits log events to prevent +//! log flooding. Events are grouped by their callsite and contextual fields, with each +//! unique combination rate limited independently. +//! +//! # How it works +//! +//! Within each rate limit window (default 10 seconds): +//! - **1st occurrence**: Event is emitted normally +//! - **2nd occurrence**: Emits a "suppressing" warning +//! - **3rd+ occurrences**: Silent until window expires +//! - **After window**: Emits a summary of suppressed count, then next event normally +//! +//! # Rate limit grouping +//! +//! Events are rate limited independently based on a combination of: +//! - **Callsite**: The code location where the log statement appears +//! - **Contextual fields**: Any fields attached to the event or its parent spans +//! +//! ## How fields contribute to grouping +//! +//! **Only these fields create distinct rate limit groups:** +//! - `component_id` - Different components are rate limited independently +//! - `vrl_position` - Different VRL script positions are rate limited independently +//! +//! **All other fields are ignored for grouping**, including: +//! - `fanout_id`, `input_id`, `output_id` - Not used for grouping to avoid cardinality issues +//! - `message` - The log message itself doesn't differentiate groups +//! - `internal_log_rate_limit` - Control field for enabling/disabling rate limiting +//! - `internal_log_rate_secs` - Control field for customizing the rate limit window +//! - Any custom fields you add +//! +//! This restrictive approach prevents high-cardinality fields (like request IDs or UUIDs) from +//! causing unbounded memory growth. +//! +//! ## Examples +//! +//! ```rust,ignore +//! // Example 1: Different component_id values create separate rate limit groups +//! info!(component_id = "transform_1", "Processing event"); // Group A +//! info!(component_id = "transform_2", "Processing event"); // Group B +//! // Even though the message is identical, these are rate limited independently +//! +//! // Example 2: Only component_id and vrl_position matter for grouping +//! info!(component_id = "router", fanout_id = "output_1", "Routing event"); // Group C +//! info!(component_id = "router", fanout_id = "output_2", "Routing event"); // Group C (same group!) +//! info!(component_id = "router", fanout_id = "output_1", "Routing event"); // Group C (same group!) +//! info!(component_id = "router", fanout_id = "output_1", input_id = "kafka", "Routing event"); // Group C (same!) +//! // All of these share the same group because they have the same component_id +//! // The fanout_id and input_id fields are ignored to prevent memory issues +//! +//! // Example 3: Span fields contribute to grouping +//! let span = info_span!("process", component_id = "transform_1"); +//! let _enter = span.enter(); +//! info!("Processing event"); // Group E: callsite + component_id from span +//! drop(_enter); +//! +//! let span = info_span!("process", component_id = "transform_2"); +//! let _enter = span.enter(); +//! info!("Processing event"); // Group F: same callsite but different component_id +//! +//! // Example 4: Nested spans - child span fields take precedence +//! let outer = info_span!("outer", component_id = "parent"); +//! let _outer_guard = outer.enter(); +//! let inner = info_span!("inner", component_id = "child"); +//! let _inner_guard = inner.enter(); +//! info!("Nested event"); // Grouped by component_id = "child" +//! +//! // Example 5: Same callsite with no fields = single rate limit group +//! info!("Simple message"); // Group G +//! info!("Simple message"); // Group G (same group, will be rate limited together) +//! info!("Simple message"); // Group G +//! +//! // Example 6: VRL position tracking for script errors +//! info!(vrl_position = "line 10", "VRL compilation error"); // Group H +//! info!(vrl_position = "line 25", "VRL compilation error"); // Group I +//! // Each line in a VRL script gets its own rate limit group +//! +//! // Example 7: Custom fields are ignored for grouping +//! info!(component_id = "source", input_id = "in_1", "Received data"); // Group J +//! info!(component_id = "source", input_id = "in_2", "Received data"); // Group J (same group!) +//! // The input_id field is ignored - only component_id matters +//! +//! // Example 8: Disabling rate limiting for specific logs +//! // Rate limiting is ON by default - explicitly disable for important logs +//! warn!( +//! component_id = "critical_component", +//! message = "Fatal error occurred", +//! internal_log_rate_limit = false +//! ); +//! // This event will NEVER be rate limited, regardless of how often it fires +//! +//! // Example 9: Custom rate limit window for specific events +//! info!( +//! component_id = "noisy_component", +//! message = "Frequent status update", +//! internal_log_rate_secs = 60 // Only log once per minute +//! ); +//! // Override the default window for this specific log +//! ``` +//! +//! This ensures logs from different components and VRL positions are rate limited independently, +//! while preventing memory issues from high-cardinality fields. + +use std::fmt; use dashmap::DashMap; use tracing_core::{ @@ -69,12 +173,6 @@ where /// - 1st occurrence: Emitted normally /// - 2nd occurrence: Shows "suppressing" warning /// - 3rd+ occurrences: Silent until window expires - /// - /// # Examples - /// ```ignore - /// RateLimitedLayer::new(layer) - /// .with_default_limit(10) // 10-second windows - /// ``` pub fn with_default_limit(mut self, internal_log_rate_limit: u64) -> Self { self.internal_log_rate_limit = internal_log_rate_limit; self @@ -152,13 +250,25 @@ where None => self.internal_log_rate_limit, }; - // Visit all of the spans in the scope of this event, looking for specific fields that we use to differentiate - // rate-limited events. This ensures that we don't rate limit an event's _callsite_, but the specific usage of a - // callsite, since multiple copies of the same component could be running, etc. + // Build a composite key from event fields and span context to determine the rate limit group. + // This multi-step process ensures we capture all relevant contextual information: + // + // 1. Start with event-level fields (e.g., fields directly on the log macro call) + // 2. Walk up the span hierarchy from root to current span + // 3. Merge in fields from each span, with child spans taking precedence + // + // This means an event's rate limit group is determined by the combination of: + // - Its callsite (handled separately via RateKeyIdentifier) + // - All contextual fields from both the event and its span ancestry + // + // Example: The same `info!("msg")` callsite in different component contexts becomes + // distinct rate limit groups, allowing fine-grained control over log flooding. let rate_limit_key_values = { let mut keys = RateLimitedSpanKeys::default(); + // Capture fields directly on this event event.record(&mut keys); + // Walk span hierarchy and merge in contextual fields ctx.lookup_current() .into_iter() .flat_map(|span| span.scope().from_root()) @@ -332,6 +442,18 @@ enum TraceValue { Bool(bool), } +#[cfg(test)] +impl fmt::Display for TraceValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + TraceValue::String(s) => write!(f, "{}", s), + TraceValue::Int(i) => write!(f, "{}", i), + TraceValue::Uint(u) => write!(f, "{}", u), + TraceValue::Bool(b) => write!(f, "{}", b), + } + } +} + impl From for TraceValue { fn from(b: bool) -> Self { TraceValue::Bool(b) @@ -356,36 +478,44 @@ impl From for TraceValue { } } -/// RateLimitedSpanKeys records span keys that we use to rate limit callsites separately by. For -/// example, if a given trace callsite is called from two different components, then they will be -/// rate limited separately. +/// RateLimitedSpanKeys records span and event fields that differentiate rate limit groups. +/// +/// This struct is used to build a composite key that uniquely identifies a rate limit bucket. +/// Events with different field values will be rate limited independently, even if they come +/// from the same callsite. +/// +/// ## Field categories: +/// +/// **Tracked fields** (only these create distinct rate limit groups): +/// - `component_id` - Different components are rate limited independently +/// - `vrl_position` - Different VRL script positions are rate limited independently +/// +/// **Ignored fields**: All other fields (including `fanout_id`, `input_id`, etc.) are ignored +/// for grouping purposes. This prevents high-cardinality fields from causing memory issues. +/// +/// ## Example: +/// +/// ```rust,ignore +/// // These create separate groups (different component_id): +/// info!(component_id = "source_1", "Event received"); // Group A +/// info!(component_id = "source_2", "Event received"); // Group B +/// +/// // These share the same group (same component_id, fanout_id is ignored): +/// info!(component_id = "router", fanout_id = "output_1", "Routing"); // Group C +/// info!(component_id = "router", fanout_id = "output_2", "Routing"); // Group C (same!) +/// ``` #[derive(Default, Eq, PartialEq, Hash, Clone)] struct RateLimitedSpanKeys { component_id: Option, vrl_position: Option, - other_fields: BTreeMap, } impl RateLimitedSpanKeys { fn record(&mut self, field: &Field, value: TraceValue) { - let field_name = field.name(); - - // Skip internal rate limiting control fields - if field_name == RATE_LIMIT_FIELD - || field_name == RATE_LIMIT_SECS_FIELD - || field_name == MESSAGE_FIELD - { - return; - } - - // Track known semantic fields explicitly - match field_name { + match field.name() { COMPONENT_ID_FIELD => self.component_id = Some(value), VRL_POSITION => self.vrl_position = Some(value), - // Everything else goes into the catch-all bucket - _ => { - self.other_fields.insert(field_name.to_string(), value); - } + _ => {} } } @@ -396,10 +526,6 @@ impl RateLimitedSpanKeys { if let Some(vrl_position) = &other.vrl_position { self.vrl_position = Some(vrl_position.clone()); } - // Merge other fields, with 'other' taking precedence - for (key, value) in &other.other_fields { - self.other_fields.insert(key.clone(), value.clone()); - } } } @@ -477,6 +603,7 @@ impl Visit for MessageVisitor { #[cfg(test)] mod test { use std::{ + collections::BTreeMap, sync::{Arc, Mutex}, time::Duration, }; @@ -487,6 +614,41 @@ mod test { use super::*; + #[derive(Debug, Clone, PartialEq, Eq)] + struct RecordedEvent { + message: String, + fields: BTreeMap, + } + + impl RecordedEvent { + fn new(message: impl Into) -> Self { + Self { + message: message.into(), + fields: BTreeMap::new(), + } + } + + fn with_field(mut self, key: impl Into, value: impl Into) -> Self { + self.fields.insert(key.into(), value.into()); + self + } + } + + /// Macro to create RecordedEvent with optional fields + /// Usage: + /// - `event!("message")` - just message + /// - `event!("message", key1: "value1")` - message with one field + /// - `event!("message", key1: "value1", key2: "value2")` - message with multiple fields + macro_rules! event { + ($msg:expr) => { + RecordedEvent::new($msg) + }; + ($msg:expr, $($key:ident: $value:expr),+ $(,)?) => { + RecordedEvent::new($msg) + $(.with_field(stringify!($key), $value))+ + }; + } + #[derive(Default)] struct AllFieldsVisitor { fields: BTreeMap, @@ -520,37 +682,36 @@ mod test { } impl AllFieldsVisitor { - fn format(&self) -> String { - let mut parts: Vec = Vec::new(); - - // Always show message first if present - if let Some(msg) = self.fields.get("message") { - parts.push(msg.clone()); - } - - // Then show other fields in sorted order - for (key, value) in &self.fields { + fn into_event(self) -> RecordedEvent { + let message = self + .fields + .get("message") + .cloned() + .unwrap_or_else(|| String::from("")); + + let mut fields = BTreeMap::new(); + for (key, value) in self.fields { if key != "message" && key != "internal_log_rate_limit" && key != "internal_log_rate_secs" { - parts.push(format!("{key}={value}")); + fields.insert(key, value); } } - parts.join(" ") + RecordedEvent { message, fields } } } #[derive(Default)] struct RecordingLayer { - events: Arc>>, + events: Arc>>, _subscriber: std::marker::PhantomData, } impl RecordingLayer { - fn new(events: Arc>>) -> Self { + fn new(events: Arc>>) -> Self { RecordingLayer { events, @@ -586,37 +747,23 @@ mod test { } // Add vrl_position if let Some(val) = &span_keys.vrl_position { - let formatted = match val { - TraceValue::String(s) => s.clone(), - TraceValue::Int(i) => i.to_string(), - TraceValue::Uint(u) => u.to_string(), - TraceValue::Bool(b) => b.to_string(), - }; - visitor.fields.insert("vrl_position".to_string(), formatted); - } - // Add other_fields - for (key, value) in &span_keys.other_fields { - let formatted = match value { - TraceValue::String(s) => s.clone(), - TraceValue::Int(i) => i.to_string(), - TraceValue::Uint(u) => u.to_string(), - TraceValue::Bool(b) => b.to_string(), - }; - visitor.fields.insert(key.clone(), formatted); + visitor + .fields + .insert("vrl_position".to_string(), val.to_string()); } } } } let mut events = self.events.lock().unwrap(); - events.push(visitor.format()); + events.push(visitor.into_event()); } } #[test] #[serial] fn rate_limits() { - let events: Arc>> = Default::default(); + let events: Arc>> = Default::default(); let recorder = RecordingLayer::new(Arc::clone(&events)); let sub = tracing_subscriber::registry::Registry::default() @@ -633,24 +780,21 @@ mod test { assert_eq!( *events, vec![ - "Hello world!", - "Internal log [Hello world!] is being suppressed to avoid flooding.", - "Internal log [Hello world!] has been suppressed 9 times.", - "Hello world!", - "Internal log [Hello world!] is being suppressed to avoid flooding.", - "Internal log [Hello world!] has been suppressed 9 times.", - "Hello world!", + event!("Hello world!"), + event!("Internal log [Hello world!] is being suppressed to avoid flooding."), + event!("Internal log [Hello world!] has been suppressed 9 times."), + event!("Hello world!"), + event!("Internal log [Hello world!] is being suppressed to avoid flooding."), + event!("Internal log [Hello world!] has been suppressed 9 times."), + event!("Hello world!"), ] - .into_iter() - .map(std::borrow::ToOwned::to_owned) - .collect::>() ); } #[test] #[serial] fn override_rate_limit_at_callsite() { - let events: Arc>> = Default::default(); + let events: Arc>> = Default::default(); let recorder = RecordingLayer::new(Arc::clone(&events)); let sub = tracing_subscriber::registry::Registry::default() @@ -667,24 +811,21 @@ mod test { assert_eq!( *events, vec![ - "Hello world!", - "Internal log [Hello world!] is being suppressed to avoid flooding.", - "Internal log [Hello world!] has been suppressed 9 times.", - "Hello world!", - "Internal log [Hello world!] is being suppressed to avoid flooding.", - "Internal log [Hello world!] has been suppressed 9 times.", - "Hello world!", + event!("Hello world!"), + event!("Internal log [Hello world!] is being suppressed to avoid flooding."), + event!("Internal log [Hello world!] has been suppressed 9 times."), + event!("Hello world!"), + event!("Internal log [Hello world!] is being suppressed to avoid flooding."), + event!("Internal log [Hello world!] has been suppressed 9 times."), + event!("Hello world!"), ] - .into_iter() - .map(std::borrow::ToOwned::to_owned) - .collect::>() ); } #[test] #[serial] fn rate_limit_by_span_key() { - let events: Arc>> = Default::default(); + let events: Arc>> = Default::default(); let recorder = RecordingLayer::new(Arc::clone(&events)); let sub = tracing_subscriber::registry::Registry::default() @@ -710,45 +851,42 @@ mod test { assert_eq!( *events, vec![ - "Hello foo on line_number 1! component_id=foo vrl_position=1", - "Hello foo on line_number 2! component_id=foo vrl_position=2", - "Hello bar on line_number 1! component_id=bar vrl_position=1", - "Hello bar on line_number 2! component_id=bar vrl_position=2", - "Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding. component_id=foo vrl_position=1", - "Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding. component_id=foo vrl_position=2", - "Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding. component_id=bar vrl_position=1", - "Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding. component_id=bar vrl_position=2", - "Internal log [Hello foo on line_number 1!] has been suppressed 9 times. component_id=foo vrl_position=1", - "Hello foo on line_number 1! component_id=foo vrl_position=1", - "Internal log [Hello foo on line_number 2!] has been suppressed 9 times. component_id=foo vrl_position=2", - "Hello foo on line_number 2! component_id=foo vrl_position=2", - "Internal log [Hello bar on line_number 1!] has been suppressed 9 times. component_id=bar vrl_position=1", - "Hello bar on line_number 1! component_id=bar vrl_position=1", - "Internal log [Hello bar on line_number 2!] has been suppressed 9 times. component_id=bar vrl_position=2", - "Hello bar on line_number 2! component_id=bar vrl_position=2", - "Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding. component_id=foo vrl_position=1", - "Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding. component_id=foo vrl_position=2", - "Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding. component_id=bar vrl_position=1", - "Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding. component_id=bar vrl_position=2", - "Internal log [Hello foo on line_number 1!] has been suppressed 9 times. component_id=foo vrl_position=1", - "Hello foo on line_number 1! component_id=foo vrl_position=1", - "Internal log [Hello foo on line_number 2!] has been suppressed 9 times. component_id=foo vrl_position=2", - "Hello foo on line_number 2! component_id=foo vrl_position=2", - "Internal log [Hello bar on line_number 1!] has been suppressed 9 times. component_id=bar vrl_position=1", - "Hello bar on line_number 1! component_id=bar vrl_position=1", - "Internal log [Hello bar on line_number 2!] has been suppressed 9 times. component_id=bar vrl_position=2", - "Hello bar on line_number 2! component_id=bar vrl_position=2", + event!("Hello foo on line_number 1!", component_id: "foo", vrl_position: "1"), + event!("Hello foo on line_number 2!", component_id: "foo", vrl_position: "2"), + event!("Hello bar on line_number 1!", component_id: "bar", vrl_position: "1"), + event!("Hello bar on line_number 2!", component_id: "bar", vrl_position: "2"), + event!("Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding.", component_id: "foo", vrl_position: "1"), + event!("Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding.", component_id: "foo", vrl_position: "2"), + event!("Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding.", component_id: "bar", vrl_position: "1"), + event!("Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding.", component_id: "bar", vrl_position: "2"), + event!("Internal log [Hello foo on line_number 1!] has been suppressed 9 times.", component_id: "foo", vrl_position: "1"), + event!("Hello foo on line_number 1!", component_id: "foo", vrl_position: "1"), + event!("Internal log [Hello foo on line_number 2!] has been suppressed 9 times.", component_id: "foo", vrl_position: "2"), + event!("Hello foo on line_number 2!", component_id: "foo", vrl_position: "2"), + event!("Internal log [Hello bar on line_number 1!] has been suppressed 9 times.", component_id: "bar", vrl_position: "1"), + event!("Hello bar on line_number 1!", component_id: "bar", vrl_position: "1"), + event!("Internal log [Hello bar on line_number 2!] has been suppressed 9 times.", component_id: "bar", vrl_position: "2"), + event!("Hello bar on line_number 2!", component_id: "bar", vrl_position: "2"), + event!("Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding.", component_id: "foo", vrl_position: "1"), + event!("Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding.", component_id: "foo", vrl_position: "2"), + event!("Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding.", component_id: "bar", vrl_position: "1"), + event!("Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding.", component_id: "bar", vrl_position: "2"), + event!("Internal log [Hello foo on line_number 1!] has been suppressed 9 times.", component_id: "foo", vrl_position: "1"), + event!("Hello foo on line_number 1!", component_id: "foo", vrl_position: "1"), + event!("Internal log [Hello foo on line_number 2!] has been suppressed 9 times.", component_id: "foo", vrl_position: "2"), + event!("Hello foo on line_number 2!", component_id: "foo", vrl_position: "2"), + event!("Internal log [Hello bar on line_number 1!] has been suppressed 9 times.", component_id: "bar", vrl_position: "1"), + event!("Hello bar on line_number 1!", component_id: "bar", vrl_position: "1"), + event!("Internal log [Hello bar on line_number 2!] has been suppressed 9 times.", component_id: "bar", vrl_position: "2"), + event!("Hello bar on line_number 2!", component_id: "bar", vrl_position: "2"), ] - .into_iter() - .map(std::borrow::ToOwned::to_owned) - .collect::>() ); } #[test] #[serial] fn rate_limit_by_event_key() { - let events: Arc>> = Default::default(); + let events: Arc>> = Default::default(); let recorder = RecordingLayer::new(Arc::clone(&events)); let sub = tracing_subscriber::registry::Registry::default() @@ -773,45 +911,58 @@ mod test { assert_eq!( *events, vec![ - "Hello foo on line_number 1! component_id=foo vrl_position=1", - "Hello foo on line_number 2! component_id=foo vrl_position=2", - "Hello bar on line_number 1! component_id=bar vrl_position=1", - "Hello bar on line_number 2! component_id=bar vrl_position=2", - "Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding.", - "Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding.", - "Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding.", - "Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding.", - "Internal log [Hello foo on line_number 1!] has been suppressed 9 times.", - "Hello foo on line_number 1! component_id=foo vrl_position=1", - "Internal log [Hello foo on line_number 2!] has been suppressed 9 times.", - "Hello foo on line_number 2! component_id=foo vrl_position=2", - "Internal log [Hello bar on line_number 1!] has been suppressed 9 times.", - "Hello bar on line_number 1! component_id=bar vrl_position=1", - "Internal log [Hello bar on line_number 2!] has been suppressed 9 times.", - "Hello bar on line_number 2! component_id=bar vrl_position=2", - "Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding.", - "Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding.", - "Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding.", - "Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding.", - "Internal log [Hello foo on line_number 1!] has been suppressed 9 times.", - "Hello foo on line_number 1! component_id=foo vrl_position=1", - "Internal log [Hello foo on line_number 2!] has been suppressed 9 times.", - "Hello foo on line_number 2! component_id=foo vrl_position=2", - "Internal log [Hello bar on line_number 1!] has been suppressed 9 times.", - "Hello bar on line_number 1! component_id=bar vrl_position=1", - "Internal log [Hello bar on line_number 2!] has been suppressed 9 times.", - "Hello bar on line_number 2! component_id=bar vrl_position=2", + event!("Hello foo on line_number 1!", component_id: "foo", vrl_position: "1"), + event!("Hello foo on line_number 2!", component_id: "foo", vrl_position: "2"), + event!("Hello bar on line_number 1!", component_id: "bar", vrl_position: "1"), + event!("Hello bar on line_number 2!", component_id: "bar", vrl_position: "2"), + event!( + "Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding." + ), + event!( + "Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding." + ), + event!( + "Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding." + ), + event!( + "Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding." + ), + event!("Internal log [Hello foo on line_number 1!] has been suppressed 9 times."), + event!("Hello foo on line_number 1!", component_id: "foo", vrl_position: "1"), + event!("Internal log [Hello foo on line_number 2!] has been suppressed 9 times."), + event!("Hello foo on line_number 2!", component_id: "foo", vrl_position: "2"), + event!("Internal log [Hello bar on line_number 1!] has been suppressed 9 times."), + event!("Hello bar on line_number 1!", component_id: "bar", vrl_position: "1"), + event!("Internal log [Hello bar on line_number 2!] has been suppressed 9 times."), + event!("Hello bar on line_number 2!", component_id: "bar", vrl_position: "2"), + event!( + "Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding." + ), + event!( + "Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding." + ), + event!( + "Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding." + ), + event!( + "Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding." + ), + event!("Internal log [Hello foo on line_number 1!] has been suppressed 9 times."), + event!("Hello foo on line_number 1!", component_id: "foo", vrl_position: "1"), + event!("Internal log [Hello foo on line_number 2!] has been suppressed 9 times."), + event!("Hello foo on line_number 2!", component_id: "foo", vrl_position: "2"), + event!("Internal log [Hello bar on line_number 1!] has been suppressed 9 times."), + event!("Hello bar on line_number 1!", component_id: "bar", vrl_position: "1"), + event!("Internal log [Hello bar on line_number 2!] has been suppressed 9 times."), + event!("Hello bar on line_number 2!", component_id: "bar", vrl_position: "2"), ] - .into_iter() - .map(std::borrow::ToOwned::to_owned) - .collect::>() ); } #[test] #[serial] fn disabled_rate_limit() { - let events: Arc>> = Default::default(); + let events: Arc>> = Default::default(); let recorder = RecordingLayer::new(Arc::clone(&events)); let sub = tracing_subscriber::registry::Registry::default() @@ -827,33 +978,26 @@ mod test { // All 21 events should be emitted since rate limiting is disabled assert_eq!(events.len(), 21); - assert!(events.iter().all(|e| e == "Hello world!")); + assert!(events.iter().all(|e| e == &event!("Hello world!"))); } #[test] #[serial] - fn rate_limit_by_other_fields() { - // This test demonstrates the fix for the bug where logs with different - // contextual fields (like fanout_id, output_id, etc.) were being incorrectly - // suppressed together. The fix uses a catch-all BTreeMap to ensure logs with - // different field values create separate rate limit buckets. - let events: Arc>> = Default::default(); + fn rate_limit_ignores_fanout_id() { + let events: Arc>> = Default::default(); let recorder = RecordingLayer::new(Arc::clone(&events)); let sub = tracing_subscriber::registry::Registry::default() .with(RateLimitedLayer::new(recorder).with_default_limit(1)); tracing::subscriber::with_default(sub, || { - for _ in 0..21 { - // Same component_id but different fanout_id values should be rate limited independently - info!( - message = "Configuring outputs for source.", - component_id = "demo_logs_1", - fanout_id = "input_1" - ); + for i in 0..21 { + // Call the SAME info! macro with different fanout_id values + // to verify that fanout_id doesn't create separate rate limit groups + let fanout = if i % 2 == 0 { "i1" } else { "i2" }; info!( - message = "Configuring outputs for source.", - component_id = "demo_logs_1", - fanout_id = "input_2" + message = "Test message.", + component_id = "c1", + fanout_id = fanout ); MockClock::advance(Duration::from_millis(100)); } @@ -861,30 +1005,163 @@ mod test { let events = events.lock().unwrap(); - // Each unique combination of fields should be rate limited independently - // So we should see both logs emit initially, then both show suppression warnings - // The output now clearly shows the component_id and fanout_id values + // All events share the same rate limit group (same callsite + component_id, fanout_id is ignored) + // Even though fanout_id alternates between i1 and i2, they're all in one group + assert_eq!( + *events, + vec![ + event!("Test message.", component_id: "c1", fanout_id: "i1"), + event!("Internal log [Test message.] is being suppressed to avoid flooding."), + event!("Internal log [Test message.] has been suppressed 9 times."), + event!("Test message.", component_id: "c1", fanout_id: "i1"), + event!("Internal log [Test message.] is being suppressed to avoid flooding."), + event!("Internal log [Test message.] has been suppressed 9 times."), + event!("Test message.", component_id: "c1", fanout_id: "i1"), + ] + ); + } + + #[test] + #[serial] + fn rate_limit_ignores_non_special_fields() { + let events: Arc>> = Default::default(); + + let recorder = RecordingLayer::new(Arc::clone(&events)); + let sub = tracing_subscriber::registry::Registry::default() + .with(RateLimitedLayer::new(recorder).with_default_limit(1)); + tracing::subscriber::with_default(sub, || { + for i in 0..21 { + // Call the SAME info! macro multiple times per iteration with varying fanout_id + // to verify that fanout_id doesn't create separate rate limit groups + for _ in 0..3 { + let fanout = if i % 2 == 0 { "output_1" } else { "output_2" }; + info!( + message = "Routing event", + component_id = "router", + fanout_id = fanout + ); + } + MockClock::advance(Duration::from_millis(100)); + } + }); + + let events = events.lock().unwrap(); + + // All events share the same rate limit group (same callsite + component_id) + // First event emits normally, second shows suppression, third and beyond are silent + // until the window expires + assert_eq!( + *events, + vec![ + // First iteration - first emits, second shows suppression, 3rd+ silent + event!("Routing event", component_id: "router", fanout_id: "output_1"), + event!("Internal log [Routing event] is being suppressed to avoid flooding."), + // After rate limit window (1 sec) - summary shows suppressions + event!("Internal log [Routing event] has been suppressed 29 times."), + event!("Routing event", component_id: "router", fanout_id: "output_1"), + event!("Internal log [Routing event] is being suppressed to avoid flooding."), + event!("Internal log [Routing event] has been suppressed 29 times."), + event!("Routing event", component_id: "router", fanout_id: "output_1"), + event!("Internal log [Routing event] is being suppressed to avoid flooding."), + ] + ); + } + + #[test] + #[serial] + fn nested_spans_child_takes_precedence() { + let events: Arc>> = Default::default(); + + let recorder = RecordingLayer::new(Arc::clone(&events)); + let sub = tracing_subscriber::registry::Registry::default() + .with(RateLimitedLayer::new(recorder).with_default_limit(1)); + tracing::subscriber::with_default(sub, || { + // Create nested spans where child overrides parent's component_id + let outer = info_span!("outer", component_id = "parent"); + let _outer_guard = outer.enter(); + + for _ in 0..21 { + // Inner span with different component_id should take precedence + let inner = info_span!("inner", component_id = "child"); + let _inner_guard = inner.enter(); + info!(message = "Nested event"); + drop(_inner_guard); + + MockClock::advance(Duration::from_millis(100)); + } + }); + + let events = events.lock().unwrap(); + + // All events should be grouped by component_id = "child" (from inner span) + // not "parent" (from outer span), demonstrating child precedence + assert_eq!( + *events, + vec![ + event!("Nested event", component_id: "child"), + event!("Internal log [Nested event] is being suppressed to avoid flooding.", component_id: "child"), + event!("Internal log [Nested event] has been suppressed 9 times.", component_id: "child"), + event!("Nested event", component_id: "child"), + event!("Internal log [Nested event] is being suppressed to avoid flooding.", component_id: "child"), + event!("Internal log [Nested event] has been suppressed 9 times.", component_id: "child"), + event!("Nested event", component_id: "child"), + ] + ); + } + + #[test] + #[serial] + fn nested_spans_merges_different_fields() { + let events: Arc>> = Default::default(); + + let recorder = RecordingLayer::new(Arc::clone(&events)); + let sub = tracing_subscriber::registry::Registry::default() + .with(RateLimitedLayer::new(recorder).with_default_limit(1)); + tracing::subscriber::with_default(sub, || { + // Parent has component_id, child has vrl_position - both should be included + let outer = info_span!("outer", component_id = "transform"); + let _outer_guard = outer.enter(); + + for _ in 0..21 { + let inner = info_span!("inner", vrl_position = "line 42"); + let _inner_guard = inner.enter(); + info!(message = "VRL event"); + drop(_inner_guard); + + MockClock::advance(Duration::from_millis(100)); + } + }); + + let events = events.lock().unwrap(); + + // Events should have BOTH component_id from parent AND vrl_position from child assert_eq!( *events, vec![ - "Configuring outputs for source. component_id=demo_logs_1 fanout_id=input_1", - "Configuring outputs for source. component_id=demo_logs_1 fanout_id=input_2", - "Internal log [Configuring outputs for source.] is being suppressed to avoid flooding.", - "Internal log [Configuring outputs for source.] is being suppressed to avoid flooding.", - "Internal log [Configuring outputs for source.] has been suppressed 9 times.", - "Configuring outputs for source. component_id=demo_logs_1 fanout_id=input_1", - "Internal log [Configuring outputs for source.] has been suppressed 9 times.", - "Configuring outputs for source. component_id=demo_logs_1 fanout_id=input_2", - "Internal log [Configuring outputs for source.] is being suppressed to avoid flooding.", - "Internal log [Configuring outputs for source.] is being suppressed to avoid flooding.", - "Internal log [Configuring outputs for source.] has been suppressed 9 times.", - "Configuring outputs for source. component_id=demo_logs_1 fanout_id=input_1", - "Internal log [Configuring outputs for source.] has been suppressed 9 times.", - "Configuring outputs for source. component_id=demo_logs_1 fanout_id=input_2", + event!("VRL event", component_id: "transform", vrl_position: "line 42"), + event!( + "Internal log [VRL event] is being suppressed to avoid flooding.", + component_id: "transform", + vrl_position: "line 42" + ), + event!( + "Internal log [VRL event] has been suppressed 9 times.", + component_id: "transform", + vrl_position: "line 42" + ), + event!("VRL event", component_id: "transform", vrl_position: "line 42"), + event!( + "Internal log [VRL event] is being suppressed to avoid flooding.", + component_id: "transform", + vrl_position: "line 42" + ), + event!( + "Internal log [VRL event] has been suppressed 9 times.", + component_id: "transform", + vrl_position: "line 42" + ), + event!("VRL event", component_id: "transform", vrl_position: "line 42"), ] - .into_iter() - .map(std::borrow::ToOwned::to_owned) - .collect::>() ); } } diff --git a/src/cli.rs b/src/cli.rs index 73c9423b2265f..6fc5ede473849 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -180,21 +180,7 @@ pub struct RootOpts { )] pub watch_config_poll_interval_seconds: NonZeroU64, - /// Set the internal log rate limit in seconds. - /// - /// This controls the time window for rate limiting Vector's own internal logs. - /// Within each time window, the first occurrence of a log is emitted, the second - /// shows a suppression warning, and subsequent occurrences are silent until the - /// window expires. - /// - /// Logs are grouped by their location in the code and contextual fields (like - /// component_id, fanout_id, etc.), so different log instances are rate limited - /// independently. - /// - /// Examples: - /// - 1: Very verbose, logs can repeat every second - /// - 10 (default): Logs can repeat every 10 seconds - /// - 60: Less verbose, logs can repeat every minute + /// Set the internal log rate limit #[arg( short, long, From 1871245fe2d2e7fba678a7da5e26c51c1b66e9f7 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Fri, 17 Oct 2025 18:37:42 -0400 Subject: [PATCH 05/13] use 'component_id' and add changelog --- src/topology/builder.rs | 6 ++-- src/topology/running.rs | 62 ++++++++++++++++++++--------------------- 2 files changed, 34 insertions(+), 34 deletions(-) diff --git a/src/topology/builder.rs b/src/topology/builder.rs index c38a11c5ceba9..ff3eff5705484 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -229,7 +229,7 @@ impl<'a> Builder<'a> { .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)), ) { - debug!(component = %key, "Building new source."); + debug!(component_id = %key, "Building new source."); let typetag = source.inner.get_component_name(); let source_outputs = source.inner.outputs(self.config.schema.log_namespace()); @@ -429,7 +429,7 @@ impl<'a> Builder<'a> { .transforms() .filter(|(key, _)| self.diff.transforms.contains_new(key)) { - debug!(component = %key, "Building new transform."); + debug!(component_id = %key, "Building new transform."); let input_definitions = match schema::input_definitions( &transform.inputs, @@ -541,7 +541,7 @@ impl<'a> Builder<'a> { .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)), ) { - debug!(component = %key, "Building new sink."); + debug!(component_id = %key, "Building new sink."); let sink_inputs = &sink.inputs; let healthcheck = sink.healthcheck(); diff --git a/src/topology/running.rs b/src/topology/running.rs index cfedf92e7a4d9..fba759abc7afc 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -406,7 +406,7 @@ impl RunningTopology { let deadline = Instant::now() + timeout; for key in &diff.sources.to_remove { - debug!(component = %key, "Removing source."); + debug!(component_id = %key, "Removing source."); let previous = self.tasks.remove(key).unwrap(); drop(previous); // detach and forget @@ -417,7 +417,7 @@ impl RunningTopology { } for key in &diff.sources.to_change { - debug!(component = %key, "Changing source."); + debug!(component_id = %key, "Changing source."); self.remove_outputs(key); source_shutdown_handles @@ -446,7 +446,7 @@ impl RunningTopology { // depend on, and thus the closing of their buffer, will naturally cause them to shutdown, // which is why we don't do any manual triggering of shutdown here. for key in &diff.transforms.to_remove { - debug!(component = %key, "Removing transform."); + debug!(component_id = %key, "Removing transform."); let previous = self.tasks.remove(key).unwrap(); drop(previous); // detach and forget @@ -456,7 +456,7 @@ impl RunningTopology { } for key in &diff.transforms.to_change { - debug!(component = %key, "Changing transform."); + debug!(component_id = %key, "Changing transform."); self.remove_inputs(key, diff, new_config).await; self.remove_outputs(key); @@ -577,7 +577,7 @@ impl RunningTopology { })) .collect::>(); for key in &removed_sinks { - debug!(component = %key, "Removing sink."); + debug!(component_id = %key, "Removing sink."); self.remove_inputs(key, diff, new_config).await; } @@ -598,7 +598,7 @@ impl RunningTopology { .collect::>(); for key in &sinks_to_change { - debug!(component = %key, "Changing sink."); + debug!(component_id = %key, "Changing sink."); if reuse_buffers.contains(key) { self.detach_triggers .remove(key) @@ -630,7 +630,7 @@ impl RunningTopology { for key in &removed_sinks { let previous = self.tasks.remove(key).unwrap(); if wait_for_sinks.contains(key) { - debug!(message = "Waiting for sink to shutdown.", %key); + debug!(message = "Waiting for sink to shutdown.", component_id = %key); previous.await.unwrap().unwrap(); } else { drop(previous); // detach and forget @@ -641,7 +641,7 @@ impl RunningTopology { for key in &sinks_to_change { if wait_for_sinks.contains(key) { let previous = self.tasks.remove(key).unwrap(); - debug!(message = "Waiting for sink to shutdown.", %key); + debug!(message = "Waiting for sink to shutdown.", component_id = %key); let buffer = previous.await.unwrap().unwrap(); if reuse_buffers.contains(key) { @@ -751,7 +751,7 @@ impl RunningTopology { // We configure the outputs of any changed/added sources first, so they're available to any // transforms and sinks that come afterwards. for key in diff.sources.changed_and_added() { - debug!(component = %key, "Configuring outputs for source."); + debug!(component_id = %key, "Configuring outputs for source."); self.setup_outputs(key, new_pieces).await; } @@ -761,27 +761,27 @@ impl RunningTopology { .filter(|k| new_pieces.source_tasks.contains_key(k)) .collect(); for key in added_changed_table_sources.iter() { - debug!(component = %key, "Connecting outputs for enrichment table source."); + debug!(component_id = %key, "Connecting outputs for enrichment table source."); self.setup_outputs(key, new_pieces).await; } // We configure the outputs of any changed/added transforms next, for the same reason: we // need them to be available to any transforms and sinks that come afterwards. for key in diff.transforms.changed_and_added() { - debug!(component = %key, "Configuring outputs for transform."); + debug!(component_id = %key, "Configuring outputs for transform."); self.setup_outputs(key, new_pieces).await; } // Now that all possible outputs are configured, we can start wiring up inputs, starting // with transforms. for key in diff.transforms.changed_and_added() { - debug!(component = %key, "Connecting inputs for transform."); + debug!(component_id = %key, "Connecting inputs for transform."); self.setup_inputs(key, diff, new_pieces).await; } // Now that all sources and transforms are fully configured, we can wire up sinks. for key in diff.sinks.changed_and_added() { - debug!(component = %key, "Connecting inputs for sink."); + debug!(component_id = %key, "Connecting inputs for sink."); self.setup_inputs(key, diff, new_pieces).await; } let added_changed_tables: Vec<&ComponentKey> = diff @@ -790,7 +790,7 @@ impl RunningTopology { .filter(|k| new_pieces.inputs.contains_key(k)) .collect(); for key in added_changed_tables.iter() { - debug!(component = %key, "Connecting inputs for enrichment table sink."); + debug!(component_id = %key, "Connecting inputs for enrichment table sink."); self.setup_inputs(key, diff, new_pieces).await; } @@ -866,7 +866,7 @@ impl RunningTopology { ) { let outputs = new_pieces.outputs.remove(key).unwrap(); for (port, output) in outputs { - debug!(component = %key, output_id = ?port, "Configuring output for component."); + debug!(component_id = %key, output_id = ?port, "Configuring output for component."); let id = OutputId { component: key.clone(), @@ -903,7 +903,7 @@ impl RunningTopology { // If the input we're connecting to is changing, that means its outputs will have been // recreated, so instead of replacing a paused sink, we have to add it to this new // output for the first time, since there's nothing to actually replace at this point. - debug!(component = %key, fanout_id = %input, "Adding component input to fanout."); + debug!(component_id = %key, fanout_id = %input, "Adding component input to fanout."); _ = output.send(ControlMessage::Add(key.clone(), tx.clone())); } else { @@ -911,7 +911,7 @@ impl RunningTopology { // components were changed, then the output must still exist, which means we paused // this component's connection to its output, so we have to replace that connection // now: - debug!(component = %key, fanout_id = %input, "Replacing component input in fanout."); + debug!(component_id = %key, fanout_id = %input, "Replacing component input in fanout."); _ = output.send(ControlMessage::Replace(key.clone(), tx.clone())); } @@ -955,14 +955,14 @@ impl RunningTopology { // because it isn't coming back. // // Case 3: This component is no longer connected to the input from new config. - debug!(component = %key, fanout_id = %input, "Removing component input from fanout."); + debug!(component_id = %key, fanout_id = %input, "Removing component input from fanout."); _ = output.send(ControlMessage::Remove(key.clone())); } else { // We know that if this component is connected to a given input, and it isn't being // changed, then it will exist when we reconnect inputs, so we should pause it // now to pause further sends through that component until we reconnect: - debug!(component = %key, fanout_id = %input, "Pausing component input in fanout."); + debug!(component_id = %key, fanout_id = %input, "Pausing component input in fanout."); _ = output.send(ControlMessage::Pause(key.clone())); } @@ -978,7 +978,7 @@ impl RunningTopology { for (transform_key, transform) in unchanged_transforms { let changed_outputs = get_changed_outputs(diff, transform.inputs.clone()); for output_id in changed_outputs { - debug!(component = %transform_key, fanout_id = %output_id.component, "Reattaching component input to fanout."); + debug!(component_id = %transform_key, fanout_id = %output_id.component, "Reattaching component input to fanout."); let input = self.inputs.get(transform_key).cloned().unwrap(); let output = self.outputs.get_mut(&output_id).unwrap(); @@ -993,7 +993,7 @@ impl RunningTopology { for (sink_key, sink) in unchanged_sinks { let changed_outputs = get_changed_outputs(diff, sink.inputs.clone()); for output_id in changed_outputs { - debug!(component = %sink_key, fanout_id = %output_id.component, "Reattaching component input to fanout."); + debug!(component_id = %sink_key, fanout_id = %output_id.component, "Reattaching component input to fanout."); let input = self.inputs.get(sink_key).cloned().unwrap(); let output = self.outputs.get_mut(&output_id).unwrap(); @@ -1005,12 +1005,12 @@ impl RunningTopology { /// Starts any new or changed components in the given configuration diff. pub(crate) fn spawn_diff(&mut self, diff: &ConfigDiff, mut new_pieces: TopologyPieces) { for key in &diff.sources.to_change { - debug!(message = "Spawning changed source.", key = %key); + debug!(message = "Spawning changed source.", component_id = %key); self.spawn_source(key, &mut new_pieces); } for key in &diff.sources.to_add { - debug!(message = "Spawning new source.", key = %key); + debug!(message = "Spawning new source.", component_id = %key); self.spawn_source(key, &mut new_pieces); } @@ -1029,32 +1029,32 @@ impl RunningTopology { .collect(); for key in changed_table_sources { - debug!(message = "Spawning changed enrichment table source.", key = %key); + debug!(message = "Spawning changed enrichment table source.", component_id = %key); self.spawn_source(key, &mut new_pieces); } for key in added_table_sources { - debug!(message = "Spawning new enrichment table source.", key = %key); + debug!(message = "Spawning new enrichment table source.", component_id = %key); self.spawn_source(key, &mut new_pieces); } for key in &diff.transforms.to_change { - debug!(message = "Spawning changed transform.", key = %key); + debug!(message = "Spawning changed transform.", component_id = %key); self.spawn_transform(key, &mut new_pieces); } for key in &diff.transforms.to_add { - debug!(message = "Spawning new transform.", key = %key); + debug!(message = "Spawning new transform.", component_id = %key); self.spawn_transform(key, &mut new_pieces); } for key in &diff.sinks.to_change { - debug!(message = "Spawning changed sink.", key = %key); + debug!(message = "Spawning changed sink.", component_id = %key); self.spawn_sink(key, &mut new_pieces); } for key in &diff.sinks.to_add { - trace!(message = "Spawning new sink.", key = %key); + trace!(message = "Spawning new sink.", component_id = %key); self.spawn_sink(key, &mut new_pieces); } @@ -1077,12 +1077,12 @@ impl RunningTopology { .collect(); for key in changed_tables { - debug!(message = "Spawning changed enrichment table sink.", key = %key); + debug!(message = "Spawning changed enrichment table sink.", component_id = %key); self.spawn_sink(key, &mut new_pieces); } for key in added_tables { - debug!(message = "Spawning enrichment table new sink.", key = %key); + debug!(message = "Spawning enrichment table new sink.", component_id = %key); self.spawn_sink(key, &mut new_pieces); } } From b6e52fa2829f69c744f455c703c2ee9b5fe23d8e Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Mon, 20 Oct 2025 10:07:48 -0400 Subject: [PATCH 06/13] remove unused 'vrl_position' special field --- lib/tracing-limit/src/lib.rs | 218 ++++++++++------------------------- 1 file changed, 64 insertions(+), 154 deletions(-) diff --git a/lib/tracing-limit/src/lib.rs b/lib/tracing-limit/src/lib.rs index abb6601eb9aab..59054c4651443 100644 --- a/lib/tracing-limit/src/lib.rs +++ b/lib/tracing-limit/src/lib.rs @@ -23,18 +23,14 @@ //! //! **Only these fields create distinct rate limit groups:** //! - `component_id` - Different components are rate limited independently -//! - `vrl_position` - Different VRL script positions are rate limited independently //! //! **All other fields are ignored for grouping**, including: -//! - `fanout_id`, `input_id`, `output_id` - Not used for grouping to avoid cardinality issues +//! - `fanout_id`, `input_id`, `output_id` - Not used for grouping to avoid high cardinality issues //! - `message` - The log message itself doesn't differentiate groups //! - `internal_log_rate_limit` - Control field for enabling/disabling rate limiting //! - `internal_log_rate_secs` - Control field for customizing the rate limit window //! - Any custom fields you add //! -//! This restrictive approach prevents high-cardinality fields (like request IDs or UUIDs) from -//! causing unbounded memory growth. -//! //! ## Examples //! //! ```rust,ignore @@ -43,7 +39,7 @@ //! info!(component_id = "transform_2", "Processing event"); // Group B //! // Even though the message is identical, these are rate limited independently //! -//! // Example 2: Only component_id and vrl_position matter for grouping +//! // Example 2: Only component_id matters for grouping //! info!(component_id = "router", fanout_id = "output_1", "Routing event"); // Group C //! info!(component_id = "router", fanout_id = "output_2", "Routing event"); // Group C (same group!) //! info!(component_id = "router", fanout_id = "output_1", "Routing event"); // Group C (same group!) @@ -70,20 +66,15 @@ //! //! // Example 5: Same callsite with no fields = single rate limit group //! info!("Simple message"); // Group G -//! info!("Simple message"); // Group G (same group, will be rate limited together) +//! info!("Simple message"); // Group G //! info!("Simple message"); // Group G //! -//! // Example 6: VRL position tracking for script errors -//! info!(vrl_position = "line 10", "VRL compilation error"); // Group H -//! info!(vrl_position = "line 25", "VRL compilation error"); // Group I -//! // Each line in a VRL script gets its own rate limit group -//! -//! // Example 7: Custom fields are ignored for grouping -//! info!(component_id = "source", input_id = "in_1", "Received data"); // Group J -//! info!(component_id = "source", input_id = "in_2", "Received data"); // Group J (same group!) +//! // Example 6: Custom fields are ignored for grouping +//! info!(component_id = "source", input_id = "in_1", "Received data"); // Group H +//! info!(component_id = "source", input_id = "in_2", "Received data"); // Group H (same group!) //! // The input_id field is ignored - only component_id matters //! -//! // Example 8: Disabling rate limiting for specific logs +//! // Example 7: Disabling rate limiting for specific logs //! // Rate limiting is ON by default - explicitly disable for important logs //! warn!( //! component_id = "critical_component", @@ -92,7 +83,7 @@ //! ); //! // This event will NEVER be rate limited, regardless of how often it fires //! -//! // Example 9: Custom rate limit window for specific events +//! // Example 8: Custom rate limit window for specific events //! info!( //! component_id = "noisy_component", //! message = "Frequent status update", @@ -101,7 +92,7 @@ //! // Override the default window for this specific log //! ``` //! -//! This ensures logs from different components and VRL positions are rate limited independently, +//! This ensures logs from different components are rate limited independently, //! while preventing memory issues from high-cardinality fields. use std::fmt; @@ -133,7 +124,6 @@ const MESSAGE_FIELD: &str = "message"; // These fields will cause events to be independently rate limited by the values // for these keys const COMPONENT_ID_FIELD: &str = "component_id"; -const VRL_POSITION: &str = "vrl_position"; #[derive(Eq, PartialEq, Hash, Clone)] struct RateKeyIdentifier { @@ -488,34 +478,19 @@ impl From for TraceValue { /// /// **Tracked fields** (only these create distinct rate limit groups): /// - `component_id` - Different components are rate limited independently -/// - `vrl_position` - Different VRL script positions are rate limited independently -/// -/// **Ignored fields**: All other fields (including `fanout_id`, `input_id`, etc.) are ignored -/// for grouping purposes. This prevents high-cardinality fields from causing memory issues. /// -/// ## Example: -/// -/// ```rust,ignore -/// // These create separate groups (different component_id): -/// info!(component_id = "source_1", "Event received"); // Group A -/// info!(component_id = "source_2", "Event received"); // Group B -/// -/// // These share the same group (same component_id, fanout_id is ignored): -/// info!(component_id = "router", fanout_id = "output_1", "Routing"); // Group C -/// info!(component_id = "router", fanout_id = "output_2", "Routing"); // Group C (same!) +/// **Ignored fields**: All other fields are ignored for grouping purposes. This prevents high-cardinality fields from +/// causing memory issues. /// ``` #[derive(Default, Eq, PartialEq, Hash, Clone)] struct RateLimitedSpanKeys { component_id: Option, - vrl_position: Option, } impl RateLimitedSpanKeys { fn record(&mut self, field: &Field, value: TraceValue) { - match field.name() { - COMPONENT_ID_FIELD => self.component_id = Some(value), - VRL_POSITION => self.vrl_position = Some(value), - _ => {} + if field.name() == COMPONENT_ID_FIELD { + self.component_id = Some(value); } } @@ -523,9 +498,6 @@ impl RateLimitedSpanKeys { if let Some(component_id) = &other.component_id { self.component_id = Some(component_id.clone()); } - if let Some(vrl_position) = &other.vrl_position { - self.vrl_position = Some(vrl_position.clone()); - } } } @@ -745,12 +717,6 @@ mod test { if let Some(TraceValue::String(ref s)) = span_keys.component_id { visitor.fields.insert("component_id".to_string(), s.clone()); } - // Add vrl_position - if let Some(val) = &span_keys.vrl_position { - visitor - .fields - .insert("vrl_position".to_string(), val.to_string()); - } } } } @@ -833,14 +799,9 @@ mod test { tracing::subscriber::with_default(sub, || { for _ in 0..21 { for key in &["foo", "bar"] { - for line_number in &[1, 2] { - let span = - info_span!("span", component_id = &key, vrl_position = &line_number); - let _enter = span.enter(); - info!( - message = format!("Hello {key} on line_number {line_number}!").as_str() - ); - } + let span = info_span!("span", component_id = &key); + let _enter = span.enter(); + info!(message = format!("Hello {key}!").as_str()); } MockClock::advance(Duration::from_millis(100)); } @@ -848,37 +809,24 @@ mod test { let events = events.lock().unwrap(); + // Events with different component_id values create separate rate limit groups assert_eq!( *events, vec![ - event!("Hello foo on line_number 1!", component_id: "foo", vrl_position: "1"), - event!("Hello foo on line_number 2!", component_id: "foo", vrl_position: "2"), - event!("Hello bar on line_number 1!", component_id: "bar", vrl_position: "1"), - event!("Hello bar on line_number 2!", component_id: "bar", vrl_position: "2"), - event!("Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding.", component_id: "foo", vrl_position: "1"), - event!("Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding.", component_id: "foo", vrl_position: "2"), - event!("Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding.", component_id: "bar", vrl_position: "1"), - event!("Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding.", component_id: "bar", vrl_position: "2"), - event!("Internal log [Hello foo on line_number 1!] has been suppressed 9 times.", component_id: "foo", vrl_position: "1"), - event!("Hello foo on line_number 1!", component_id: "foo", vrl_position: "1"), - event!("Internal log [Hello foo on line_number 2!] has been suppressed 9 times.", component_id: "foo", vrl_position: "2"), - event!("Hello foo on line_number 2!", component_id: "foo", vrl_position: "2"), - event!("Internal log [Hello bar on line_number 1!] has been suppressed 9 times.", component_id: "bar", vrl_position: "1"), - event!("Hello bar on line_number 1!", component_id: "bar", vrl_position: "1"), - event!("Internal log [Hello bar on line_number 2!] has been suppressed 9 times.", component_id: "bar", vrl_position: "2"), - event!("Hello bar on line_number 2!", component_id: "bar", vrl_position: "2"), - event!("Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding.", component_id: "foo", vrl_position: "1"), - event!("Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding.", component_id: "foo", vrl_position: "2"), - event!("Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding.", component_id: "bar", vrl_position: "1"), - event!("Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding.", component_id: "bar", vrl_position: "2"), - event!("Internal log [Hello foo on line_number 1!] has been suppressed 9 times.", component_id: "foo", vrl_position: "1"), - event!("Hello foo on line_number 1!", component_id: "foo", vrl_position: "1"), - event!("Internal log [Hello foo on line_number 2!] has been suppressed 9 times.", component_id: "foo", vrl_position: "2"), - event!("Hello foo on line_number 2!", component_id: "foo", vrl_position: "2"), - event!("Internal log [Hello bar on line_number 1!] has been suppressed 9 times.", component_id: "bar", vrl_position: "1"), - event!("Hello bar on line_number 1!", component_id: "bar", vrl_position: "1"), - event!("Internal log [Hello bar on line_number 2!] has been suppressed 9 times.", component_id: "bar", vrl_position: "2"), - event!("Hello bar on line_number 2!", component_id: "bar", vrl_position: "2"), + event!("Hello foo!", component_id: "foo"), + event!("Hello bar!", component_id: "bar"), + event!("Internal log [Hello foo!] is being suppressed to avoid flooding.", component_id: "foo"), + event!("Internal log [Hello bar!] is being suppressed to avoid flooding.", component_id: "bar"), + event!("Internal log [Hello foo!] has been suppressed 9 times.", component_id: "foo"), + event!("Hello foo!", component_id: "foo"), + event!("Internal log [Hello bar!] has been suppressed 9 times.", component_id: "bar"), + event!("Hello bar!", component_id: "bar"), + event!("Internal log [Hello foo!] is being suppressed to avoid flooding.", component_id: "foo"), + event!("Internal log [Hello bar!] is being suppressed to avoid flooding.", component_id: "bar"), + event!("Internal log [Hello foo!] has been suppressed 9 times.", component_id: "foo"), + event!("Hello foo!", component_id: "foo"), + event!("Internal log [Hello bar!] has been suppressed 9 times.", component_id: "bar"), + event!("Hello bar!", component_id: "bar"), ] ); } @@ -894,13 +842,7 @@ mod test { tracing::subscriber::with_default(sub, || { for _ in 0..21 { for key in &["foo", "bar"] { - for line_number in &[1, 2] { - info!( - message = format!("Hello {key} on line_number {line_number}!").as_str(), - component_id = &key, - vrl_position = &line_number - ); - } + info!(message = format!("Hello {key}!").as_str(), component_id = &key); } MockClock::advance(Duration::from_millis(100)); } @@ -908,53 +850,24 @@ mod test { let events = events.lock().unwrap(); + // Events with different component_id values create separate rate limit groups assert_eq!( *events, vec![ - event!("Hello foo on line_number 1!", component_id: "foo", vrl_position: "1"), - event!("Hello foo on line_number 2!", component_id: "foo", vrl_position: "2"), - event!("Hello bar on line_number 1!", component_id: "bar", vrl_position: "1"), - event!("Hello bar on line_number 2!", component_id: "bar", vrl_position: "2"), - event!( - "Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding." - ), - event!( - "Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding." - ), - event!( - "Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding." - ), - event!( - "Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding." - ), - event!("Internal log [Hello foo on line_number 1!] has been suppressed 9 times."), - event!("Hello foo on line_number 1!", component_id: "foo", vrl_position: "1"), - event!("Internal log [Hello foo on line_number 2!] has been suppressed 9 times."), - event!("Hello foo on line_number 2!", component_id: "foo", vrl_position: "2"), - event!("Internal log [Hello bar on line_number 1!] has been suppressed 9 times."), - event!("Hello bar on line_number 1!", component_id: "bar", vrl_position: "1"), - event!("Internal log [Hello bar on line_number 2!] has been suppressed 9 times."), - event!("Hello bar on line_number 2!", component_id: "bar", vrl_position: "2"), - event!( - "Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding." - ), - event!( - "Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding." - ), - event!( - "Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding." - ), - event!( - "Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding." - ), - event!("Internal log [Hello foo on line_number 1!] has been suppressed 9 times."), - event!("Hello foo on line_number 1!", component_id: "foo", vrl_position: "1"), - event!("Internal log [Hello foo on line_number 2!] has been suppressed 9 times."), - event!("Hello foo on line_number 2!", component_id: "foo", vrl_position: "2"), - event!("Internal log [Hello bar on line_number 1!] has been suppressed 9 times."), - event!("Hello bar on line_number 1!", component_id: "bar", vrl_position: "1"), - event!("Internal log [Hello bar on line_number 2!] has been suppressed 9 times."), - event!("Hello bar on line_number 2!", component_id: "bar", vrl_position: "2"), + event!("Hello foo!", component_id: "foo"), + event!("Hello bar!", component_id: "bar"), + event!("Internal log [Hello foo!] is being suppressed to avoid flooding."), + event!("Internal log [Hello bar!] is being suppressed to avoid flooding."), + event!("Internal log [Hello foo!] has been suppressed 9 times."), + event!("Hello foo!", component_id: "foo"), + event!("Internal log [Hello bar!] has been suppressed 9 times."), + event!("Hello bar!", component_id: "bar"), + event!("Internal log [Hello foo!] is being suppressed to avoid flooding."), + event!("Internal log [Hello bar!] is being suppressed to avoid flooding."), + event!("Internal log [Hello foo!] has been suppressed 9 times."), + event!("Hello foo!", component_id: "foo"), + event!("Internal log [Hello bar!] has been suppressed 9 times."), + event!("Hello bar!", component_id: "bar"), ] ); } @@ -1111,21 +1024,21 @@ mod test { #[test] #[serial] - fn nested_spans_merges_different_fields() { + fn nested_spans_ignores_untracked_fields() { let events: Arc>> = Default::default(); let recorder = RecordingLayer::new(Arc::clone(&events)); let sub = tracing_subscriber::registry::Registry::default() .with(RateLimitedLayer::new(recorder).with_default_limit(1)); tracing::subscriber::with_default(sub, || { - // Parent has component_id, child has vrl_position - both should be included + // Parent has component_id, child has some_field - only component_id is tracked let outer = info_span!("outer", component_id = "transform"); let _outer_guard = outer.enter(); for _ in 0..21 { - let inner = info_span!("inner", vrl_position = "line 42"); + let inner = info_span!("inner", some_field = "value"); let _inner_guard = inner.enter(); - info!(message = "VRL event"); + info!(message = "Event message"); drop(_inner_guard); MockClock::advance(Duration::from_millis(100)); @@ -1134,33 +1047,30 @@ mod test { let events = events.lock().unwrap(); - // Events should have BOTH component_id from parent AND vrl_position from child + // Events should have component_id from parent, some_field from child is ignored for grouping + // All events are in the same rate limit group assert_eq!( *events, vec![ - event!("VRL event", component_id: "transform", vrl_position: "line 42"), + event!("Event message", component_id: "transform"), event!( - "Internal log [VRL event] is being suppressed to avoid flooding.", - component_id: "transform", - vrl_position: "line 42" + "Internal log [Event message] is being suppressed to avoid flooding.", + component_id: "transform" ), event!( - "Internal log [VRL event] has been suppressed 9 times.", - component_id: "transform", - vrl_position: "line 42" + "Internal log [Event message] has been suppressed 9 times.", + component_id: "transform" ), - event!("VRL event", component_id: "transform", vrl_position: "line 42"), + event!("Event message", component_id: "transform"), event!( - "Internal log [VRL event] is being suppressed to avoid flooding.", - component_id: "transform", - vrl_position: "line 42" + "Internal log [Event message] is being suppressed to avoid flooding.", + component_id: "transform" ), event!( - "Internal log [VRL event] has been suppressed 9 times.", - component_id: "transform", - vrl_position: "line 42" + "Internal log [Event message] has been suppressed 9 times.", + component_id: "transform" ), - event!("VRL event", component_id: "transform", vrl_position: "line 42"), + event!("Event message", component_id: "transform"), ] ); } From 5cfb9c057cbc729b55991e896f3b44d29ed7dff9 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Mon, 20 Oct 2025 10:10:27 -0400 Subject: [PATCH 07/13] make fmt --- lib/tracing-limit/src/lib.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/tracing-limit/src/lib.rs b/lib/tracing-limit/src/lib.rs index 59054c4651443..c99a31883a4a4 100644 --- a/lib/tracing-limit/src/lib.rs +++ b/lib/tracing-limit/src/lib.rs @@ -842,7 +842,10 @@ mod test { tracing::subscriber::with_default(sub, || { for _ in 0..21 { for key in &["foo", "bar"] { - info!(message = format!("Hello {key}!").as_str(), component_id = &key); + info!( + message = format!("Hello {key}!").as_str(), + component_id = &key + ); } MockClock::advance(Duration::from_millis(100)); } From 5188f8dd6de55fffdc516e91943c7571cac1d647 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Mon, 20 Oct 2025 10:17:17 -0400 Subject: [PATCH 08/13] comments --- lib/tracing-limit/src/lib.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/lib/tracing-limit/src/lib.rs b/lib/tracing-limit/src/lib.rs index c99a31883a4a4..80b148031f9b3 100644 --- a/lib/tracing-limit/src/lib.rs +++ b/lib/tracing-limit/src/lib.rs @@ -25,7 +25,7 @@ //! - `component_id` - Different components are rate limited independently //! //! **All other fields are ignored for grouping**, including: -//! - `fanout_id`, `input_id`, `output_id` - Not used for grouping to avoid high cardinality issues +//! - `fanout_id`, `input_id`, `output_id` - Not used for grouping to avoid resource/cost implications from high-cardinality tags //! - `message` - The log message itself doesn't differentiate groups //! - `internal_log_rate_limit` - Control field for enabling/disabling rate limiting //! - `internal_log_rate_secs` - Control field for customizing the rate limit window @@ -45,7 +45,7 @@ //! info!(component_id = "router", fanout_id = "output_1", "Routing event"); // Group C (same group!) //! info!(component_id = "router", fanout_id = "output_1", input_id = "kafka", "Routing event"); // Group C (same!) //! // All of these share the same group because they have the same component_id -//! // The fanout_id and input_id fields are ignored to prevent memory issues +//! // The fanout_id and input_id fields are ignored to avoid resource/cost implications //! //! // Example 3: Span fields contribute to grouping //! let span = info_span!("process", component_id = "transform_1"); @@ -93,7 +93,7 @@ //! ``` //! //! This ensures logs from different components are rate limited independently, -//! while preventing memory issues from high-cardinality fields. +//! while avoiding resource/cost implications from high-cardinality tags. use std::fmt; @@ -479,8 +479,7 @@ impl From for TraceValue { /// **Tracked fields** (only these create distinct rate limit groups): /// - `component_id` - Different components are rate limited independently /// -/// **Ignored fields**: All other fields are ignored for grouping purposes. This prevents high-cardinality fields from -/// causing memory issues. +/// **Ignored fields**: All other fields are ignored for grouping purposes. This avoids resource/cost implications from high-cardinality tags. /// ``` #[derive(Default, Eq, PartialEq, Hash, Clone)] struct RateLimitedSpanKeys { From 383d557f2c1ed21ad1d97f652252b002baeb4d69 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Mon, 20 Oct 2025 10:19:34 -0400 Subject: [PATCH 09/13] changelog --- changelog.d/internal_log_component_id_field.breaking.md | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 changelog.d/internal_log_component_id_field.breaking.md diff --git a/changelog.d/internal_log_component_id_field.breaking.md b/changelog.d/internal_log_component_id_field.breaking.md new file mode 100644 index 0000000000000..23890c7374ce1 --- /dev/null +++ b/changelog.d/internal_log_component_id_field.breaking.md @@ -0,0 +1,4 @@ +Vector's internal topology logs now use the `component_id` field name instead of `component` or `key`. +If you are monitoring or filtering Vector's internal logs based on these field names, update your queries to use `component_id`. + +authors: pront From 7fc81d40bf450137accd71a527ea1005b2b72c17 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Mon, 20 Oct 2025 10:20:38 -0400 Subject: [PATCH 10/13] make fmt --- changelog.d/internal_log_component_id_field.breaking.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/internal_log_component_id_field.breaking.md b/changelog.d/internal_log_component_id_field.breaking.md index 23890c7374ce1..5237aa144c86c 100644 --- a/changelog.d/internal_log_component_id_field.breaking.md +++ b/changelog.d/internal_log_component_id_field.breaking.md @@ -1,4 +1,4 @@ -Vector's internal topology logs now use the `component_id` field name instead of `component` or `key`. +Vector's internal topology logs now use the `component_id` field name instead of `component` or `key`. If you are monitoring or filtering Vector's internal logs based on these field names, update your queries to use `component_id`. authors: pront From 03ae4de6b4a6f46297853b3fd967ece06e915029 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Mon, 20 Oct 2025 11:04:53 -0400 Subject: [PATCH 11/13] refactor tests --- lib/tracing-limit/src/lib.rs | 81 ++++++++++++++---------------------- 1 file changed, 31 insertions(+), 50 deletions(-) diff --git a/lib/tracing-limit/src/lib.rs b/lib/tracing-limit/src/lib.rs index 80b148031f9b3..2e3bd78280929 100644 --- a/lib/tracing-limit/src/lib.rs +++ b/lib/tracing-limit/src/lib.rs @@ -725,14 +725,25 @@ mod test { } } - #[test] - #[serial] - fn rate_limits() { + /// Helper function to set up a test with a rate-limited subscriber. + /// Returns the events Arc for asserting on collected events. + fn setup_test( + default_limit: u64, + ) -> ( + Arc>>, + impl Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>, + ) { let events: Arc>> = Default::default(); - let recorder = RecordingLayer::new(Arc::clone(&events)); let sub = tracing_subscriber::registry::Registry::default() - .with(RateLimitedLayer::new(recorder).with_default_limit(1)); + .with(RateLimitedLayer::new(recorder).with_default_limit(default_limit)); + (events, sub) + } + + #[test] + #[serial] + fn rate_limits() { + let (events, sub) = setup_test(1); tracing::subscriber::with_default(sub, || { for _ in 0..21 { info!(message = "Hello world!"); @@ -759,30 +770,28 @@ mod test { #[test] #[serial] fn override_rate_limit_at_callsite() { - let events: Arc>> = Default::default(); - - let recorder = RecordingLayer::new(Arc::clone(&events)); - let sub = tracing_subscriber::registry::Registry::default() - .with(RateLimitedLayer::new(recorder).with_default_limit(100)); + let (events, sub) = setup_test(100); tracing::subscriber::with_default(sub, || { - for _ in 0..21 { - info!(message = "Hello world!", internal_log_rate_secs = 1); + for _ in 0..31 { + info!(message = "Hello world!", internal_log_rate_secs = 2); MockClock::advance(Duration::from_millis(100)); } }); let events = events.lock().unwrap(); + // With a 2-second window and 100ms advances, we get: + // - Event every 20 iterations (2000ms / 100ms = 20) + // - First window: iteration 0-19 (suppressed 19 times after first 2) + // - Second window: iteration 20-39 (but we only go to 30) assert_eq!( *events, vec![ event!("Hello world!"), event!("Internal log [Hello world!] is being suppressed to avoid flooding."), - event!("Internal log [Hello world!] has been suppressed 9 times."), + event!("Internal log [Hello world!] has been suppressed 19 times."), event!("Hello world!"), event!("Internal log [Hello world!] is being suppressed to avoid flooding."), - event!("Internal log [Hello world!] has been suppressed 9 times."), - event!("Hello world!"), ] ); } @@ -790,11 +799,7 @@ mod test { #[test] #[serial] fn rate_limit_by_span_key() { - let events: Arc>> = Default::default(); - - let recorder = RecordingLayer::new(Arc::clone(&events)); - let sub = tracing_subscriber::registry::Registry::default() - .with(RateLimitedLayer::new(recorder).with_default_limit(1)); + let (events, sub) = setup_test(1); tracing::subscriber::with_default(sub, || { for _ in 0..21 { for key in &["foo", "bar"] { @@ -833,11 +838,7 @@ mod test { #[test] #[serial] fn rate_limit_by_event_key() { - let events: Arc>> = Default::default(); - - let recorder = RecordingLayer::new(Arc::clone(&events)); - let sub = tracing_subscriber::registry::Registry::default() - .with(RateLimitedLayer::new(recorder).with_default_limit(1)); + let (events, sub) = setup_test(1); tracing::subscriber::with_default(sub, || { for _ in 0..21 { for key in &["foo", "bar"] { @@ -877,11 +878,7 @@ mod test { #[test] #[serial] fn disabled_rate_limit() { - let events: Arc>> = Default::default(); - - let recorder = RecordingLayer::new(Arc::clone(&events)); - let sub = tracing_subscriber::registry::Registry::default() - .with(RateLimitedLayer::new(recorder).with_default_limit(1)); + let (events, sub) = setup_test(1); tracing::subscriber::with_default(sub, || { for _ in 0..21 { info!(message = "Hello world!", internal_log_rate_limit = false); @@ -899,11 +896,7 @@ mod test { #[test] #[serial] fn rate_limit_ignores_fanout_id() { - let events: Arc>> = Default::default(); - - let recorder = RecordingLayer::new(Arc::clone(&events)); - let sub = tracing_subscriber::registry::Registry::default() - .with(RateLimitedLayer::new(recorder).with_default_limit(1)); + let (events, sub) = setup_test(1); tracing::subscriber::with_default(sub, || { for i in 0..21 { // Call the SAME info! macro with different fanout_id values @@ -939,11 +932,7 @@ mod test { #[test] #[serial] fn rate_limit_ignores_non_special_fields() { - let events: Arc>> = Default::default(); - - let recorder = RecordingLayer::new(Arc::clone(&events)); - let sub = tracing_subscriber::registry::Registry::default() - .with(RateLimitedLayer::new(recorder).with_default_limit(1)); + let (events, sub) = setup_test(1); tracing::subscriber::with_default(sub, || { for i in 0..21 { // Call the SAME info! macro multiple times per iteration with varying fanout_id @@ -985,11 +974,7 @@ mod test { #[test] #[serial] fn nested_spans_child_takes_precedence() { - let events: Arc>> = Default::default(); - - let recorder = RecordingLayer::new(Arc::clone(&events)); - let sub = tracing_subscriber::registry::Registry::default() - .with(RateLimitedLayer::new(recorder).with_default_limit(1)); + let (events, sub) = setup_test(1); tracing::subscriber::with_default(sub, || { // Create nested spans where child overrides parent's component_id let outer = info_span!("outer", component_id = "parent"); @@ -1027,11 +1012,7 @@ mod test { #[test] #[serial] fn nested_spans_ignores_untracked_fields() { - let events: Arc>> = Default::default(); - - let recorder = RecordingLayer::new(Arc::clone(&events)); - let sub = tracing_subscriber::registry::Registry::default() - .with(RateLimitedLayer::new(recorder).with_default_limit(1)); + let (events, sub) = setup_test(1); tracing::subscriber::with_default(sub, || { // Parent has component_id, child has some_field - only component_id is tracked let outer = info_span!("outer", component_id = "transform"); From 5e771d262319a1d1e273f2cd370c9be632ae64a0 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Mon, 20 Oct 2025 11:13:01 -0400 Subject: [PATCH 12/13] improve cli doc --- src/cli.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index b4fcbed51ace6..5b9d39fc1b3cd 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -187,8 +187,8 @@ pub struct RootOpts { /// shows a suppression warning, and subsequent occurrences are silent until the /// window expires. /// - /// Logs are grouped by their location in the code and contextual fields so different log instances can be rate - /// limited independently. + /// Logs are grouped by their location in the code and the `component_id` field, so logs + /// from different components are rate limited independently. /// /// Examples: /// - 1: Very verbose, logs can repeat every second From 4d046613a66ff175019e2b10e2f981cf148764c9 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Mon, 20 Oct 2025 14:58:53 -0400 Subject: [PATCH 13/13] added rate_limit_same_message_different_component test case and removed duplication --- lib/tracing-limit/src/lib.rs | 105 ++++++++++------------------------- 1 file changed, 30 insertions(+), 75 deletions(-) diff --git a/lib/tracing-limit/src/lib.rs b/lib/tracing-limit/src/lib.rs index 2e3bd78280929..62113f36ce7cc 100644 --- a/lib/tracing-limit/src/lib.rs +++ b/lib/tracing-limit/src/lib.rs @@ -796,45 +796,6 @@ mod test { ); } - #[test] - #[serial] - fn rate_limit_by_span_key() { - let (events, sub) = setup_test(1); - tracing::subscriber::with_default(sub, || { - for _ in 0..21 { - for key in &["foo", "bar"] { - let span = info_span!("span", component_id = &key); - let _enter = span.enter(); - info!(message = format!("Hello {key}!").as_str()); - } - MockClock::advance(Duration::from_millis(100)); - } - }); - - let events = events.lock().unwrap(); - - // Events with different component_id values create separate rate limit groups - assert_eq!( - *events, - vec![ - event!("Hello foo!", component_id: "foo"), - event!("Hello bar!", component_id: "bar"), - event!("Internal log [Hello foo!] is being suppressed to avoid flooding.", component_id: "foo"), - event!("Internal log [Hello bar!] is being suppressed to avoid flooding.", component_id: "bar"), - event!("Internal log [Hello foo!] has been suppressed 9 times.", component_id: "foo"), - event!("Hello foo!", component_id: "foo"), - event!("Internal log [Hello bar!] has been suppressed 9 times.", component_id: "bar"), - event!("Hello bar!", component_id: "bar"), - event!("Internal log [Hello foo!] is being suppressed to avoid flooding.", component_id: "foo"), - event!("Internal log [Hello bar!] is being suppressed to avoid flooding.", component_id: "bar"), - event!("Internal log [Hello foo!] has been suppressed 9 times.", component_id: "foo"), - event!("Hello foo!", component_id: "foo"), - event!("Internal log [Hello bar!] has been suppressed 9 times.", component_id: "bar"), - event!("Hello bar!", component_id: "bar"), - ] - ); - } - #[test] #[serial] fn rate_limit_by_event_key() { @@ -893,42 +854,6 @@ mod test { assert!(events.iter().all(|e| e == &event!("Hello world!"))); } - #[test] - #[serial] - fn rate_limit_ignores_fanout_id() { - let (events, sub) = setup_test(1); - tracing::subscriber::with_default(sub, || { - for i in 0..21 { - // Call the SAME info! macro with different fanout_id values - // to verify that fanout_id doesn't create separate rate limit groups - let fanout = if i % 2 == 0 { "i1" } else { "i2" }; - info!( - message = "Test message.", - component_id = "c1", - fanout_id = fanout - ); - MockClock::advance(Duration::from_millis(100)); - } - }); - - let events = events.lock().unwrap(); - - // All events share the same rate limit group (same callsite + component_id, fanout_id is ignored) - // Even though fanout_id alternates between i1 and i2, they're all in one group - assert_eq!( - *events, - vec![ - event!("Test message.", component_id: "c1", fanout_id: "i1"), - event!("Internal log [Test message.] is being suppressed to avoid flooding."), - event!("Internal log [Test message.] has been suppressed 9 times."), - event!("Test message.", component_id: "c1", fanout_id: "i1"), - event!("Internal log [Test message.] is being suppressed to avoid flooding."), - event!("Internal log [Test message.] has been suppressed 9 times."), - event!("Test message.", component_id: "c1", fanout_id: "i1"), - ] - ); - } - #[test] #[serial] fn rate_limit_ignores_non_special_fields() { @@ -1057,4 +982,34 @@ mod test { ] ); } + + #[test] + #[serial] + fn rate_limit_same_message_different_component() { + let (events, sub) = setup_test(1); + tracing::subscriber::with_default(sub, || { + // Use a loop with the SAME callsite to demonstrate that identical messages + // with different component_ids create separate rate limit groups + for component in &["foo", "foo", "bar"] { + info!(message = "Hello!", component_id = component); + MockClock::advance(Duration::from_millis(100)); + } + }); + + let events = events.lock().unwrap(); + + // The first "foo" event is emitted normally (count=0) + // The second "foo" event triggers suppression warning (count=1) + // The "bar" event is emitted normally (count=0 for its group) + // This proves that even with identical message text, different component_ids + // create separate rate limit groups + assert_eq!( + *events, + vec![ + event!("Hello!", component_id: "foo"), + event!("Internal log [Hello!] is being suppressed to avoid flooding."), + event!("Hello!", component_id: "bar"), + ] + ); + } }