diff --git a/changelog.d/23994_aggregate_transform.enhancement.md b/changelog.d/23994_aggregate_transform.enhancement.md new file mode 100644 index 0000000000000..0f824c6fe28e4 --- /dev/null +++ b/changelog.d/23994_aggregate_transform.enhancement.md @@ -0,0 +1,3 @@ +The aggregate transform now supports aggregating metrics into Distribution metric types, enabling statistical analysis of metric values over time windows. + +authors: jlambatl diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 6c3f522f10a27..85858c4a0d0ce 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -11,7 +11,7 @@ use vector_lib::{ configurable::configurable_component, event::{ MetricValue, - metric::{Metric, MetricData, MetricKind, MetricSeries}, + metric::{Metric, MetricData, MetricKind, MetricSeries, Sample, StatisticKind}, }, }; @@ -73,6 +73,11 @@ pub enum AggregationMode { /// Stdev value of absolute metric, ignores incremental Stdev, + + /// Aggregates absolute metrics into a distribution, ignores incremental. + /// Histograms: For each count in the bucket, a sample is added at the bucket's upper limit + /// This preserves the distribution shape. + Distribution, } const fn default_mode() -> AggregationMode { @@ -149,10 +154,11 @@ impl Aggregate { AggregationMode::Max | AggregationMode::Min => { self.record_comparison(series, data, metadata) } - AggregationMode::Mean | AggregationMode::Stdev => match data.kind { - MetricKind::Incremental => (), - MetricKind::Absolute => { - if matches!(data.value, MetricValue::Gauge { value: _ }) { + AggregationMode::Mean | AggregationMode::Stdev | AggregationMode::Distribution => { + match data.kind { + MetricKind::Incremental => (), + MetricKind::Absolute => { + // For Distribution mode, we accept any metric value type match self.multi_map.entry(series) { Entry::Occupied(mut entry) => { let existing = entry.get_mut(); @@ -164,7 +170,7 @@ impl Aggregate { } } } - }, + } } emit!(AggregateEventRecorded); @@ -313,6 +319,11 @@ impl Aggregate { let metric = Metric::from_parts(series, final_stdev, final_metadata); output.push(Event::Metric(metric)); } + AggregationMode::Distribution => { + let distribution_data = self.create_distribution_from_entries(&entries); + let metric = Metric::from_parts(series, distribution_data, final_metadata); + output.push(Event::Metric(metric)); + } _ => (), } } @@ -320,6 +331,73 @@ impl Aggregate { self.prev_map = map; emit!(AggregateFlushed); } + + // Creates a distribution metric from a collection of metric entries by converting various metric types into samples. + // This function handles Gauge, Set, Distribution, and AggregatedHistogram metric types. + // For Gauge and Set types, it creates samples directly from their values. + // For Distribution types, it merges existing samples. + // For AggregatedHistogram types, it generates samples based on bucket counts and upper limits. + // The resulting samples are sorted and used to create a new Distribution metric. + // The distribution is sorted. + fn create_distribution_from_entries(&self, entries: &[MetricEntry]) -> MetricData { + let mut samples = Vec::new(); + + for (data, _) in entries { + match data.value() { + MetricValue::Gauge { value } => { + samples.push(Sample { + value: *value, + rate: 1, + }); + } + MetricValue::Set { values } => { + // For sets, create a sample for each unique value with rate 1 + // sets use strings, that means we need to parse them to f64 + for value in values { + if let Ok(parsed_value) = value.parse::() { + samples.push(Sample { + value: parsed_value, + rate: 1, + }); + } + } + } + MetricValue::Distribution { + samples: dist_samples, + .. + } => { + // If already a distribution, merge the samples + samples.extend(dist_samples.clone()); + } + MetricValue::AggregatedHistogram { buckets, .. } => { + for bucket in buckets { + // For each count in the bucket, add a sample at the bucket's upper limit + // This preserves the distribution shape + for _ in 0..bucket.count { + samples.push(Sample { + value: bucket.upper_limit, + rate: 1, + }); + } + } + } + _ => (), + } + } + samples.sort_by(|a, b| { + a.value + .partial_cmp(&b.value) + .unwrap_or(std::cmp::Ordering::Equal) + }); + MetricData { + time: entries.first().unwrap().0.time, + kind: MetricKind::Absolute, + value: MetricValue::Distribution { + samples, + statistic: StatisticKind::Histogram, + }, + } + } } impl TaskTransform for Aggregate { @@ -378,6 +456,7 @@ mod tests { test_util::components::assert_transform_compliance, transforms::test::create_topology, }; + use vector_lib::event::metric::Bucket; #[test] fn generate_config() { @@ -397,6 +476,32 @@ mod tests { event } + // Helper function to verify distribution metric properties + fn assert_distribution_samples( + event: &Event, + expected_samples: &[f64], + expected_statistic: StatisticKind, + ) { + if let MetricValue::Distribution { samples, statistic } = event.as_metric().value() { + assert_eq!( + samples.len(), + expected_samples.len(), + "expected {} samples in distribution but got {}", + expected_samples.len(), + samples.len() + ); + assert_eq!(*statistic, expected_statistic); + + let values: Vec = samples.iter().map(|s| s.value).collect(); + assert_eq!( + values, expected_samples, + "samples should match expected values and be sorted" + ); + } else { + panic!("Expected Distribution metric value"); + } + } + #[test] fn incremental_auto() { let mut agg = Aggregate::new(&AggregateConfig { @@ -1142,4 +1247,443 @@ interval_ms = 999999 }) .await; } + + #[test] + fn absolute_distribution() { + let mut agg = Aggregate::new(&AggregateConfig { + interval_ms: 1000_u64, + mode: AggregationMode::Distribution, + }) + .unwrap(); + + let gauge_a_1 = make_metric( + "gauge_a", + MetricKind::Absolute, + MetricValue::Gauge { value: 25.0 }, + ); + let gauge_a_2 = make_metric( + "gauge_a", + MetricKind::Absolute, + MetricValue::Gauge { value: 30.0 }, + ); + let gauge_a_3 = make_metric( + "gauge_a", + MetricKind::Absolute, + MetricValue::Gauge { value: 35.0 }, + ); + let gauge_a_4 = make_metric( + "gauge_a", + MetricKind::Absolute, + MetricValue::Gauge { value: 36.0 }, + ); + let gauge_a_5 = make_metric( + "gauge_a", + MetricKind::Absolute, + MetricValue::Gauge { value: 37.0 }, + ); + let gauge_a_6 = make_metric( + "gauge_a", + MetricKind::Absolute, + MetricValue::Gauge { value: 38.0 }, + ); + let gauge_a_7 = make_metric( + "gauge_a", + MetricKind::Absolute, + MetricValue::Gauge { value: 39.0 }, + ); + let counter_a_1 = make_metric( + "counter_a", + MetricKind::Incremental, + MetricValue::Counter { value: 110.0 }, + ); + // Record seven gauge values in non-sorted order to verify sorting + agg.record(gauge_a_4); // 36.0 + agg.record(gauge_a_1); // 25.0 + agg.record(gauge_a_7); // 39.0 + agg.record(gauge_a_2); // 30.0 + agg.record(gauge_a_5); // 37.0 + agg.record(gauge_a_3); // 35.0 + agg.record(gauge_a_6); // 38.0 + // Verify that incremental metrics are ignored + agg.record(counter_a_1); + + let mut out = vec![]; + agg.flush_into(&mut out); + assert_eq!(1, out.len()); + + // Verify it's a distribution with 7 samples, properly sorted + assert_distribution_samples( + &out[0], + &[25.0, 30.0, 35.0, 36.0, 37.0, 38.0, 39.0], + StatisticKind::Histogram, + ); + } + + #[test] + fn absolute_distribution_distributions() { + let mut agg = Aggregate::new(&AggregateConfig { + interval_ms: 1000_u64, + mode: AggregationMode::Distribution, + }) + .unwrap(); + + let distribution_a_1 = make_metric( + "dist_a", + MetricKind::Absolute, + MetricValue::Distribution { + samples: vec![35.0, 25.0, 36.0, 30.0] // Intentionally unsorted + .into_iter() + .map(|v| Sample { value: v, rate: 1 }) + .collect(), + statistic: StatisticKind::Summary, + }, + ); + let distribution_a_2 = make_metric( + "dist_a", + MetricKind::Absolute, + MetricValue::Distribution { + samples: vec![39.0, 37.0, 38.0] // Intentionally unsorted + .into_iter() + .map(|v| Sample { value: v, rate: 1 }) + .collect(), + statistic: StatisticKind::Summary, + }, + ); + let distribution_a_3 = make_metric( + "dist_a", + MetricKind::Absolute, + MetricValue::Distribution { + samples: vec![456.2, 99.0, 113.0] // Intentionally unsorted + .into_iter() + .map(|v| Sample { value: v, rate: 1 }) + .collect(), + statistic: StatisticKind::Summary, + }, + ); + // Record three Distribution values + agg.record(distribution_a_1); + agg.record(distribution_a_2); + agg.record(distribution_a_3); + + let mut out = vec![]; + agg.flush_into(&mut out); + assert_eq!(1, out.len()); + + // Verify it's a distribution with 10 samples, properly sorted + assert_distribution_samples( + &out[0], + &[25.0, 30.0, 35.0, 36.0, 37.0, 38.0, 39.0, 99.0, 113.0, 456.2], + StatisticKind::Histogram, + ); + } + + #[test] + fn distribution_histogram() { + let mut agg = Aggregate::new(&AggregateConfig { + interval_ms: 1000_u64, + mode: AggregationMode::Distribution, + }) + .unwrap(); + + let histogram_a_1 = make_metric( + "histogram_a", + MetricKind::Absolute, + MetricValue::AggregatedHistogram { + count: 5, + sum: 18.0, + buckets: vec![ + Bucket { + upper_limit: 1.0, + count: 1, + }, + Bucket { + upper_limit: 2.0, + count: 2, + }, + Bucket { + upper_limit: 5.0, + count: 1, + }, + Bucket { + upper_limit: 10.0, + count: 1, + }, + ], + }, + ); + let histogram_a_2 = make_metric( + "histogram_a", + MetricKind::Absolute, + MetricValue::AggregatedHistogram { + count: 5, + sum: 18.0, + buckets: vec![ + Bucket { + upper_limit: 1.0, + count: 1, + }, + Bucket { + upper_limit: 2.0, + count: 2, + }, + Bucket { + upper_limit: 5.0, + count: 1, + }, + Bucket { + upper_limit: 10.0, + count: 1, + }, + ], + }, + ); + let histogram_a_3 = make_metric( + "histogram_a", + MetricKind::Absolute, + MetricValue::AggregatedHistogram { + count: 5, + sum: 18.0, + buckets: vec![ + Bucket { + upper_limit: 1.0, + count: 1, + }, + Bucket { + upper_limit: 2.0, + count: 2, + }, + Bucket { + upper_limit: 5.0, + count: 1, + }, + Bucket { + upper_limit: 10.0, + count: 1, + }, + ], + }, + ); + + let expected_vals = vec![ + 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 5.0, 5.0, 5.0, 10.0, 10.0, 10.0, + ]; + + // Record Metrics Values + agg.record(histogram_a_1); + agg.record(histogram_a_2); + agg.record(histogram_a_3); + + let mut out = vec![]; + agg.flush_into(&mut out); + assert_eq!(1, out.len()); + + // Verify it's a distribution with 15 samples + assert_distribution_samples(&out[0], &expected_vals, StatisticKind::Histogram); + } + + #[test] + fn distribution_set() { + let mut agg = Aggregate::new(&AggregateConfig { + interval_ms: 1000_u64, + mode: AggregationMode::Distribution, + }) + .unwrap(); + + let set_a_1 = make_metric( + "set_a", + MetricKind::Absolute, + MetricValue::Set { + values: vec![ + "39".into(), + "25".into(), + "99".into(), + "35".into(), + "113".into(), + "30".into(), + "456.2".into(), + "36".into(), + "37".into(), + "38".into(), + ] + .into_iter() + .collect(), + }, + ); + + let set_a_2 = make_metric( + "set_a", + MetricKind::Absolute, + MetricValue::Set { + values: vec![ + "40".into(), + "26".into(), + "100".into(), + "36".into(), + "114".into(), + "31".into(), + "556.2".into(), + "37".into(), + "38".into(), + "39".into(), + ] + .into_iter() + .collect(), + }, + ); + // Record two set values + agg.record(set_a_1); + agg.record(set_a_2); + + let mut out = vec![]; + agg.flush_into(&mut out); + assert_eq!( + 1, + out.len(), + "expected 1 output metric but got {}", + out.len() + ); + + // Verify it's a distribution with 20 samples (10 from each set), properly sorted + let expected = vec![ + 25.0, 26.0, 30.0, 31.0, 35.0, 36.0, 36.0, 37.0, 37.0, 38.0, 38.0, 39.0, 39.0, 40.0, + 99.0, 100.0, 113.0, 114.0, 456.2, 556.2, + ]; + assert_distribution_samples(&out[0], &expected, StatisticKind::Histogram); + } + + #[test] + fn distribution_mixed_types() { + let mut agg = Aggregate::new(&AggregateConfig { + interval_ms: 1000_u64, + mode: AggregationMode::Distribution, + }) + .unwrap(); + + // Test that different metric series can each become distributions + let gauge_a = make_metric( + "gauge_a", + MetricKind::Absolute, + MetricValue::Gauge { value: 10.0 }, + ); + + let distribution_b = make_metric( + "dist_b", + MetricKind::Absolute, + MetricValue::Distribution { + samples: vec![20.0, 30.0] + .into_iter() + .map(|v| Sample { value: v, rate: 1 }) + .collect(), + statistic: StatisticKind::Summary, + }, + ); + + let histogram_c = make_metric( + "histogram_c", + MetricKind::Absolute, + MetricValue::AggregatedHistogram { + count: 2, + sum: 90.0, + buckets: vec![ + Bucket { + upper_limit: 40.0, + count: 1, + }, + Bucket { + upper_limit: 50.0, + count: 1, + }, + ], + }, + ); + + agg.record(gauge_a); + agg.record(distribution_b); + agg.record(histogram_c); + + let mut out = vec![]; + agg.flush_into(&mut out); + assert_eq!(3, out.len()); // Each different series produces its own distribution + + // Verify all outputs are distributions + for event in out { + if let MetricValue::Distribution { samples, statistic } = event.as_metric().value() { + assert_eq!(*statistic, StatisticKind::Histogram); + assert!(!samples.is_empty()); + } else { + panic!("Expected Distribution metric value"); + } + } + } + + #[test] + fn distribution_set_with_invalid_values() { + let mut agg = Aggregate::new(&AggregateConfig { + interval_ms: 1000_u64, + mode: AggregationMode::Distribution, + }) + .unwrap(); + + let set_with_invalid = make_metric( + "set_a", + MetricKind::Absolute, + MetricValue::Set { + values: vec![ + "10.0".into(), + "not_a_number".into(), // Should be skipped + "20.0".into(), + "invalid".into(), // Should be skipped + "30.0".into(), + ] + .into_iter() + .collect(), + }, + ); + + agg.record(set_with_invalid); + + let mut out = vec![]; + agg.flush_into(&mut out); + assert_eq!(1, out.len()); + + // Only the 3 valid numeric values should be included + assert_distribution_samples(&out[0], &[10.0, 20.0, 30.0], StatisticKind::Histogram); + } + + #[test] + fn distribution_multiple_series() { + let mut agg = Aggregate::new(&AggregateConfig { + interval_ms: 1000_u64, + mode: AggregationMode::Distribution, + }) + .unwrap(); + + let gauge_a = make_metric( + "gauge_a", + MetricKind::Absolute, + MetricValue::Gauge { value: 10.0 }, + ); + let gauge_b = make_metric( + "gauge_b", + MetricKind::Absolute, + MetricValue::Gauge { value: 20.0 }, + ); + + agg.record(gauge_a); + agg.record(gauge_b); + + let mut out = vec![]; + agg.flush_into(&mut out); + + // Should produce 2 separate distributions + assert_eq!(2, out.len()); + + for event in out { + if let MetricValue::Distribution { samples, .. } = event.as_metric().value() { + assert_eq!(samples.len(), 1); + } else { + panic!("Expected Distribution metric value"); + } + } + } } diff --git a/website/cue/reference/components/transforms/aggregate.cue b/website/cue/reference/components/transforms/aggregate.cue index 37980aa252e23..f064117be3665 100644 --- a/website/cue/reference/components/transforms/aggregate.cue +++ b/website/cue/reference/components/transforms/aggregate.cue @@ -175,7 +175,9 @@ components: transforms: aggregate: { aggregated into a single `incremental` `counter` with a value of 23. Two `absolute` `gauge` metrics with values 93 and 95 would result in a single `absolute` `gauge` with the value of 95. More complex types like `distribution`, `histogram`, `set`, and `summary` behave similarly with `incremental` - values being combined in a manner that makes sense based on their type. + values being combined in a manner that makes sense based on their type. When using distribution as the + aggregation mode, the individual samples are collected and their values are combined into a single + sorted distribution representation. """ } diff --git a/website/cue/reference/components/transforms/generated/aggregate.cue b/website/cue/reference/components/transforms/generated/aggregate.cue index 725285a35281b..b1b3360135feb 100644 --- a/website/cue/reference/components/transforms/generated/aggregate.cue +++ b/website/cue/reference/components/transforms/generated/aggregate.cue @@ -20,9 +20,14 @@ generated: components: transforms: aggregate: configuration: { type: string: { default: "Auto" enum: { - Auto: "Default mode. Sums incremental metrics and uses the latest value for absolute metrics." - Count: "Counts metrics for incremental and absolute metrics" - Diff: "Returns difference between latest value for absolute, ignores incremental" + Auto: "Default mode. Sums incremental metrics and uses the latest value for absolute metrics." + Count: "Counts metrics for incremental and absolute metrics" + Diff: "Returns difference between latest value for absolute, ignores incremental" + Distribution: """ + Aggregates absolute metrics into a distribution, ignores incremental. + Histograms: For each count in the bucket, a sample is added at the bucket's upper limit + This preserves the distribution shape. + """ Latest: "Returns the latest value for absolute metrics, ignores incremental" Max: "Max value of absolute metric, ignores incremental" Mean: "Mean value of absolute metric, ignores incremental"