diff --git a/rust/otap-dataflow/configs/README.md b/rust/otap-dataflow/configs/README.md index 1aa828ed4e..36e2be84bc 100644 --- a/rust/otap-dataflow/configs/README.md +++ b/rust/otap-dataflow/configs/README.md @@ -34,6 +34,13 @@ Demonstrates the filter processor: - Generates fake data -> filter processor -> debug processor -> noop exporter +### `fake-metric-filter-debug-noop.yaml` + +Demonstrates metric-name filtering: + +- Generates fake metrics -> filter processor by metric name -> debug processor + -> noop exporter + ### `fake-transform-debug-noop.yaml` Demonstrate using the transform processor to transform data @@ -80,11 +87,10 @@ Generates mixed-tenant traffic using weighted resource attribute rotation: The `resource_attributes` field accepts three forms: -| Form | Description | -| ---- | ----------- | -| Single map | All batches carry the same attributes (weight 1) | -| List of maps | Equal round-robin rotation across entries (weight 1 each) | -| List of weighted entries (`attrs` + `weight`) | Each entry receives batches proportional to its weight | +- Single map: all batches carry the same attributes (weight 1). +- List of maps: equal round-robin rotation across entries (weight 1 each). +- List of weighted entries (`attrs` + `weight`): each entry receives batches + proportional to its weight. > **Note:** `resource_attributes` only applies to `data_source: static`. > With `generation_strategy: pre_generated`, only the first attribute set is used. @@ -163,14 +169,21 @@ Syslog/CEF receiver with performance metrics: To send a quick test message (UDP): ```bash -echo "<134>$(date '+%b %d %H:%M:%S') testhost testtag: Test message" | nc -u -w1 127.0.0.1 5140 +echo "<134>$(date '+%b %d %H:%M:%S') testhost testtag: Test message" \ + | nc -u -w1 127.0.0.1 5140 ``` -For sustained load testing, see the [load generator](../../tools/pipeline_perf_test/load_generator/readme.md): +For sustained load testing, see the +[load generator](../../tools/pipeline_perf_test/load_generator/readme.md): ```bash cd tools/pipeline_perf_test/load_generator -python loadgen.py --load-type syslog --syslog-server 127.0.0.1 --syslog-port 5140 --syslog-transport udp --duration 15 +python loadgen.py \ + --load-type syslog \ + --syslog-server 127.0.0.1 \ + --syslog-port 5140 \ + --syslog-transport udp \ + --duration 15 ``` > **Note:** The default `syslog-perf.yaml` config only enables UDP. diff --git a/rust/otap-dataflow/configs/fake-metric-filter-debug-noop.yaml b/rust/otap-dataflow/configs/fake-metric-filter-debug-noop.yaml new file mode 100644 index 0000000000..b63490ce3e --- /dev/null +++ b/rust/otap-dataflow/configs/fake-metric-filter-debug-noop.yaml @@ -0,0 +1,50 @@ +version: otel_dataflow/v1 +engine: {} +groups: + default: + pipelines: + main: + policies: + channel_capacity: + control: + node: 100 + pipeline: 100 + pdata: 128 + + nodes: + receiver: + type: receiver:traffic_generator + config: + data_source: static + traffic_config: + max_signal_count: 1000 + max_batch_size: 100 + signals_per_second: 100 + metric_weight: 100 + trace_weight: 0 + log_weight: 0 + filter: + type: processor:filter + config: + metrics: + include: + match_type: strict + metric_names: + - http.server.request.count + - process.cpu.utilization + debug: + type: processor:debug + config: + verbosity: detailed + mode: signal + noop: + type: exporter:noop + config: + + connections: + - from: receiver + to: filter + - from: filter + to: debug + - from: debug + to: noop diff --git a/rust/otap-dataflow/crates/core-nodes/src/processors/filter_processor/README.md b/rust/otap-dataflow/crates/core-nodes/src/processors/filter_processor/README.md index bd029931f4..a6a2f51b46 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/processors/filter_processor/README.md +++ b/rust/otap-dataflow/crates/core-nodes/src/processors/filter_processor/README.md @@ -10,6 +10,16 @@ For reference please the golang version of the filter processor. ```yaml config: + metrics: + include: + match_type: strict + metric_names: + - http.server.request.count + - process.cpu.utilization + exclude: + match_type: regexp + metric_names: + - ^internal\..*$ logs: include: match_type: strict @@ -77,7 +87,15 @@ config: value: false ``` -Currently we don't support metric filtering +For a runnable metric-name filter pipeline, see +[`configs/fake-metric-filter-debug-noop.yaml`](../../../../configs/fake-metric-filter-debug-noop.yaml). + +### Metrics + +To filter metrics, define `metrics.include` or `metrics.exclude` with a +`match_type` and `metric_names`. Supported `match_type` values are `strict` and +`regexp`. When both `include` and `exclude` are defined, include filtering runs +first, and exclude filtering is applied to that result. ### Logs diff --git a/rust/otap-dataflow/crates/core-nodes/src/processors/filter_processor/config.rs b/rust/otap-dataflow/crates/core-nodes/src/processors/filter_processor/config.rs index e7dc370c7d..13aa5804f4 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/processors/filter_processor/config.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/processors/filter_processor/config.rs @@ -4,13 +4,14 @@ //! Implementation of the configuration of the filter processor //! -use otap_df_pdata::otap::filter::{logs::LogFilter, traces::TraceFilter}; +use otap_df_pdata::otap::filter::{logs::LogFilter, metrics::MetricFilter, traces::TraceFilter}; use serde::Deserialize; #[derive(Debug, Clone, Deserialize)] pub struct Config { - // ToDo: add metrics + #[serde(default = "default_metric_filter")] + metrics: MetricFilter, #[serde(default = "default_log_filter")] logs: LogFilter, #[serde(default = "default_trace_filter")] @@ -22,6 +23,11 @@ const fn default_log_filter() -> LogFilter { LogFilter::new(None, None, vec![]) } +/// create empty metric filter as default value +const fn default_metric_filter() -> MetricFilter { + MetricFilter::new(None, None) +} + /// create empty trace filter as default value const fn default_trace_filter() -> TraceFilter { TraceFilter::new(None, None) @@ -29,7 +35,28 @@ const fn default_trace_filter() -> TraceFilter { impl Config { pub const fn new(logs: LogFilter, traces: TraceFilter) -> Self { - Self { logs, traces } + Self { + metrics: MetricFilter::new(None, None), + logs, + traces, + } + } + + pub const fn new_with_metrics( + metrics: MetricFilter, + logs: LogFilter, + traces: TraceFilter, + ) -> Self { + Self { + metrics, + logs, + traces, + } + } + + #[must_use] + pub const fn metric_filters(&self) -> &MetricFilter { + &self.metrics } #[must_use] diff --git a/rust/otap-dataflow/crates/core-nodes/src/processors/filter_processor/metrics.rs b/rust/otap-dataflow/crates/core-nodes/src/processors/filter_processor/metrics.rs index 69add3e351..4aa5eaf6a5 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/processors/filter_processor/metrics.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/processors/filter_processor/metrics.rs @@ -12,6 +12,9 @@ pub struct FilterPdataMetrics { /// Number of log signals consumed #[metric(unit = "{log}")] pub log_signals_consumed: Counter, + /// Number of metric signals consumed + #[metric(unit = "{metric}")] + pub metric_signals_consumed: Counter, /// Number of span signals consumed #[metric(unit = "{span}")] pub span_signals_consumed: Counter, @@ -19,6 +22,9 @@ pub struct FilterPdataMetrics { /// Number of log signals filtered #[metric(unit = "{log}")] pub log_signals_filtered: Counter, + /// Number of metric signals filtered + #[metric(unit = "{metric}")] + pub metric_signals_filtered: Counter, /// Number of span signals filtered #[metric(unit = "{span}")] pub span_signals_filtered: Counter, diff --git a/rust/otap-dataflow/crates/core-nodes/src/processors/filter_processor/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/processors/filter_processor/mod.rs index f24c2803c3..3fe710f136 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/processors/filter_processor/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/processors/filter_processor/mod.rs @@ -28,6 +28,7 @@ use otap_df_engine::processor::ProcessorWrapper; use otap_df_otap::{OTAP_PROCESSOR_FACTORIES, pdata::OtapPdata}; use otap_df_pdata::TryIntoWithOptions; use otap_df_pdata::otap::OtapArrowRecords; +use otap_df_pdata::otap::filter::IdBitmapPool; use otap_df_telemetry::metrics::MetricSet; use serde_json::Value; use std::sync::Arc; @@ -42,6 +43,11 @@ pub struct FilterProcessor { config: Config, metrics: MetricSet, compute_duration: ComputeDuration, + /// Reusable paged-bitmap pool for filtering metric child batches across + /// successive `Message::PData` calls. Storing the pool on the processor + /// (rather than allocating one per call) preserves the page allocations + /// that `IdBitmap::clear` would otherwise re-create on every batch. + metric_id_pool: IdBitmapPool, } /// Factory function to create a FilterProcessor. @@ -88,6 +94,7 @@ impl FilterProcessor { config, metrics, compute_duration, + metric_id_pool: IdBitmapPool::new(), } } @@ -103,6 +110,7 @@ impl FilterProcessor { config, metrics, compute_duration, + metric_id_pool: IdBitmapPool::new(), }) } } @@ -141,8 +149,20 @@ impl local::Processor for FilterProcessor { effect_handler.timed(&self.compute_duration, || -> Result<_, Error> { match signal { SignalType::Metrics => { - // ToDo: Add support for metrics - Ok((arrow_records, 0, 0)) + let (filtered, consumed, filtered_count) = self + .config + .metric_filters() + .filter(arrow_records, &mut self.metric_id_pool) + .map_err(|e| { + let source_detail = format_error_sources(&e); + Error::ProcessorError { + processor: effect_handler.processor_id(), + kind: ProcessorErrorKind::Other, + error: format!("Filter error: {e}"), + source_detail, + } + })?; + Ok((filtered, consumed, filtered_count)) } SignalType::Logs => { let (filtered, consumed, filtered_count) = @@ -178,7 +198,10 @@ impl local::Processor for FilterProcessor { })?; match signal { - SignalType::Metrics => {} + SignalType::Metrics => { + self.metrics.metric_signals_consumed.add(signals_consumed); + self.metrics.metric_signals_filtered.add(signals_filtered); + } SignalType::Logs => { self.metrics.log_signals_consumed.add(signals_consumed); self.metrics.log_signals_filtered.add(signals_filtered); @@ -219,11 +242,16 @@ mod tests { use otap_df_pdata::otap::filter::{ AnyValue as AnyValueFilter, KeyValue as KeyValueFilter, MatchType, logs::{LogFilter, LogMatchProperties, LogSeverityNumberMatchProperties}, + metrics::{MetricFilter, MetricMatchProperties}, traces::{TraceFilter, TraceMatchProperties}, }; use otap_df_pdata::proto::opentelemetry::{ common::v1::{AnyValue, InstrumentationScope, KeyValue}, logs::v1::{LogRecord, LogsData, ResourceLogs, ScopeLogs, SeverityNumber}, + metrics::v1::{ + AggregationTemporality, Metric, MetricsData, NumberDataPoint, ResourceMetrics, + ScopeMetrics, Sum, + }, resource::v1::Resource, trace::v1::{ ResourceSpans, ScopeSpans, Span, Status, TracesData, @@ -233,6 +261,7 @@ mod tests { }; use otap_df_telemetry::registry::TelemetryRegistryHandle; use prost::Message as _; + use serde_json::json; use std::future::Future; use std::pin::Pin; use std::sync::Arc; @@ -643,6 +672,39 @@ mod tests { ]) } + fn build_metrics(names: &[&str]) -> MetricsData { + build_metrics_with_indices(&names.iter().copied().enumerate().collect::>()) + } + + fn build_metrics_with_indices(names: &[(usize, &str)]) -> MetricsData { + MetricsData::new(vec![ResourceMetrics::new( + Resource::default(), + vec![ScopeMetrics::new( + InstrumentationScope::build() + .name("scope".to_string()) + .finish(), + names + .iter() + .map(|&(index, name)| { + Metric::build() + .name(name) + .data_sum(Sum::new( + AggregationTemporality::Cumulative, + true, + vec![ + NumberDataPoint::build() + .time_unix_nano(1000u64 + index as u64) + .value_int(index as i64) + .finish(), + ], + )) + .finish() + }) + .collect::>(), + )], + )]) + } + /// Validation closure that checks the outputted data fn validation_procedure() -> impl FnOnce(ValidateContext) -> Pin>> { |mut _ctx| Box::pin(async move {}) @@ -682,6 +744,42 @@ mod tests { } } + /// Test closure that simulates a typical metrics processor scenario. + fn scenario_metrics( + sent: MetricsData, + expected: MetricsData, + ) -> impl FnOnce(TestContext) -> Pin>> { + move |mut ctx| { + Box::pin(async move { + let mut bytes = vec![]; + sent.encode(&mut bytes) + .expect("failed to encode metrics data into bytes"); + let otlp_metrics_bytes = OtapPdata::new_default( + OtlpProtoBytes::ExportMetricsRequest(bytes.into()).into(), + ); + ctx.process(Message::PData(otlp_metrics_bytes)) + .await + .expect("failed to process"); + let msgs = ctx.drain_pdata().await; + assert_eq!(msgs.len(), 1); + let received_metrics_data = &msgs[0]; + let (_, payload) = received_metrics_data.clone().into_parts(); + let otlp_bytes: OtlpProtoBytes = payload + .try_into_with_default() + .expect("failed to convert to OtlpProtoBytes"); + let received_metrics_data = match otlp_bytes { + OtlpProtoBytes::ExportMetricsRequest(bytes) => { + MetricsData::decode(bytes.as_ref()) + .expect("failed to decode metrics into metricsdata") + } + _ => panic!("expected metrics type"), + }; + + assert_eq!(received_metrics_data, expected); + }) + } + } + /// Test closure that simulates a typical processor scenario. fn scenario_traces( expected: TracesData, @@ -849,6 +947,73 @@ mod tests { .validate(validation_procedure()); } + #[test] + fn test_filter_processor_metrics_strict_include_only() { + let test_runtime = TestRuntime::new(); + + let metric_props = MetricMatchProperties::new( + MatchType::Strict, + vec!["test.counter1".into(), "test.counter3".into()], + ); + let metric_filter = MetricFilter::new(Some(metric_props), None); + let log_filter = LogFilter::new(None, None, Vec::new()); + let trace_filter = TraceFilter::new(None, None); + + let config = Config::new_with_metrics(metric_filter, log_filter, trace_filter); + let user_config = Arc::new(NodeUserConfig::new_processor_config(FILTER_PROCESSOR_URN)); + let telemetry_registry_handle = TelemetryRegistryHandle::new(); + let controller_ctx = ControllerContext::new(telemetry_registry_handle); + let pipeline_ctx = + controller_ctx.pipeline_context_with("grp".into(), "pipeline".into(), 0, 1, 0); + let processor = ProcessorWrapper::local( + FilterProcessor::new(config, pipeline_ctx), + test_node(test_runtime.config().name.clone()), + user_config, + test_runtime.config(), + ); + + let sent = build_metrics(&["test.counter1", "test.counter2", "test.counter3"]); + let expected = build_metrics_with_indices(&[(0, "test.counter1"), (2, "test.counter3")]); + + test_runtime + .set_processor(processor) + .run_test(scenario_metrics(sent, expected)) + .validate(validation_procedure()); + } + + #[test] + fn test_filter_processor_metrics_include_metric_names_from_config() { + let test_runtime = TestRuntime::new(); + + let user_config = Arc::new(NodeUserConfig::new_processor_config(FILTER_PROCESSOR_URN)); + let telemetry_registry_handle = TelemetryRegistryHandle::new(); + let controller_ctx = ControllerContext::new(telemetry_registry_handle); + let pipeline_ctx = + controller_ctx.pipeline_context_with("grp".into(), "pipeline".into(), 0, 1, 0); + let config = json!({ + "metrics": { + "include": { + "match_type": "strict", + "metric_names": ["test.counter1", "test.counter3"] + } + } + }); + let processor = ProcessorWrapper::local( + FilterProcessor::from_config(pipeline_ctx, &config).expect("config should parse"), + test_node(test_runtime.config().name.clone()), + user_config, + test_runtime.config(), + ); + + let sent = build_metrics(&["test.counter1", "test.counter2", "test.counter3"]); + let expected = build_metrics_with_indices(&[(0, "test.counter1"), (2, "test.counter3")]); + + test_runtime + .set_processor(processor) + .run_test(scenario_metrics(sent, expected)) + .validate(validation_procedure()); + } + #[test] fn test_filter_processor_logs_regex() { let test_runtime = TestRuntime::new(); diff --git a/rust/otap-dataflow/crates/pdata/src/otap/filter.rs b/rust/otap-dataflow/crates/pdata/src/otap/filter.rs index fee37cc8bf..95331116a5 100644 --- a/rust/otap-dataflow/crates/pdata/src/otap/filter.rs +++ b/rust/otap-dataflow/crates/pdata/src/otap/filter.rs @@ -20,6 +20,7 @@ use serde::Deserialize; use std::collections::HashMap; use std::sync::Arc; pub mod logs; +pub mod metrics; pub mod traces; // threshold numbers to determine which method to use for building id filter // ToDo: determine optimimal numbers diff --git a/rust/otap-dataflow/crates/pdata/src/otap/filter/metrics.rs b/rust/otap-dataflow/crates/pdata/src/otap/filter/metrics.rs new file mode 100644 index 0000000000..af6e8d45cb --- /dev/null +++ b/rust/otap-dataflow/crates/pdata/src/otap/filter/metrics.rs @@ -0,0 +1,267 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Metric filtering support for OTAP batches. + +use crate::arrays::get_required_array; +use crate::otap::OtapArrowRecords; +use crate::otap::error::{Error, Result}; +use crate::otap::filter::{ + IdBitmapPool, MatchType, default_match_type, filter_otap_batch, nulls_to_false, + regex_match_column, +}; +use crate::schema::consts; +use arrow::array::{BooleanArray, StringArray}; +use arrow::buffer::BooleanBuffer; +use serde::Deserialize; + +/// Overall requirements to use when filtering metrics. +#[derive(Debug, Clone, Deserialize)] +pub struct MetricFilter { + /// Include match properties describe metrics that should be included in the pipeline. + /// If both include and exclude are specified, include filtering occurs first. + include: Option, + /// Exclude match properties describe metrics that should be excluded from the pipeline. + /// If both include and exclude are specified, include filtering occurs first. + exclude: Option, +} + +/// Set of metric properties to match against. +#[derive(Debug, Clone, Deserialize)] +pub struct MetricMatchProperties { + /// MatchType specifies the type of matching desired. + #[serde(default = "default_match_type")] + match_type: MatchType, + + /// MetricNames is a list of metric names that the metric's name field must match against. + #[serde(default)] + metric_names: Vec, +} + +impl MetricFilter { + /// Create a new metric filter. + #[must_use] + pub const fn new( + include: Option, + exclude: Option, + ) -> Self { + Self { include, exclude } + } + + /// Take a metrics payload and return the filtered result. + /// + /// `pool` is borrowed from the caller so that paged-bitmap allocations are reused + /// across batches. The filter itself does not own the pool because it is `&self` + /// (potentially shared); the caller (typically the processor) is the natural owner + /// of the per-instance reuse state. See [`crate::otap::filter::IdBitmapPool`]. + /// + /// Returns tuple of (filtered batch, metrics_consumed, metrics_filtered). + pub fn filter( + &self, + metrics_payload: OtapArrowRecords, + pool: &mut IdBitmapPool, + ) -> Result<(OtapArrowRecords, u64, u64)> { + let payload_type = metrics_payload.root_payload_type(); + let metrics = metrics_payload + .root_record_batch() + .ok_or_else(|| Error::RecordBatchNotFound { payload_type })?; + let num_rows = metrics.num_rows() as u64; + + let metric_filter = if let Some(include_config) = &self.include + && let Some(exclude_config) = &self.exclude + { + let include_filter = include_config.create_filter(&metrics_payload, false)?; + let exclude_filter = exclude_config.create_filter(&metrics_payload, true)?; + arrow::compute::and_kleene(&include_filter, &exclude_filter) + .map_err(|e| Error::ColumnLengthMismatch { source: e })? + } else if self.include.is_none() + && let Some(exclude_config) = &self.exclude + { + exclude_config.create_filter(&metrics_payload, true)? + } else if let Some(include_config) = &self.include + && self.exclude.is_none() + { + include_config.create_filter(&metrics_payload, false)? + } else { + return Ok((metrics_payload, num_rows, 0)); + }; + + let filtered_count = num_rows - metric_filter.true_count() as u64; + let filtered = filter_otap_batch(&metric_filter, &metrics_payload, pool)?; + Ok((filtered, num_rows, filtered_count)) + } +} + +impl MetricMatchProperties { + /// Create a new metric match properties value. + #[must_use] + pub const fn new(match_type: MatchType, metric_names: Vec) -> Self { + Self { + match_type, + metric_names, + } + } + + /// Create a filter for the metrics root record batch. + pub fn create_filter( + &self, + metrics_payload: &OtapArrowRecords, + invert: bool, + ) -> Result { + let metrics = + metrics_payload + .root_record_batch() + .ok_or_else(|| Error::RecordBatchNotFound { + payload_type: metrics_payload.root_payload_type(), + })?; + let num_rows = metrics.num_rows(); + + if self.metric_names.is_empty() { + return Ok(BooleanArray::from(BooleanBuffer::new_set(num_rows))); + } + + let names_column = get_required_array(metrics, consts::NAME)?; + let mut filter = BooleanArray::from(BooleanBuffer::new_unset(num_rows)); + for name in &self.metric_names { + let name_filter = match self.match_type { + MatchType::Regexp => regex_match_column(names_column, name)?, + MatchType::Strict => { + let value_scalar = StringArray::new_scalar(name); + arrow::compute::kernels::cmp::eq(&names_column, &value_scalar) + .expect("can compare string name column to string scalar") + } + }; + filter = arrow::compute::or_kleene(&name_filter, &filter) + .map_err(|e| Error::ColumnLengthMismatch { source: e })?; + } + + let filter = nulls_to_false(&filter); + if invert { + Ok(arrow::compute::not(&filter).expect("not doesn't fail")) + } else { + Ok(filter) + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::otap::filter::MatchType; + use crate::proto::OtlpProtoMessage; + use crate::proto::opentelemetry::common::v1::InstrumentationScope; + use crate::proto::opentelemetry::metrics::v1::{ + AggregationTemporality, Metric, MetricsData, NumberDataPoint, ResourceMetrics, + ScopeMetrics, Sum, + }; + use crate::proto::opentelemetry::resource::v1::Resource; + use crate::testing::equiv::assert_equivalent; + use crate::testing::round_trip::{otap_to_otlp, otlp_to_otap}; + + fn build_metrics(names: &[&str]) -> MetricsData { + build_metrics_with_indices(&names.iter().copied().enumerate().collect::>()) + } + + fn build_metrics_with_indices(names: &[(usize, &str)]) -> MetricsData { + MetricsData::new(vec![ResourceMetrics::new( + Resource::default(), + vec![ScopeMetrics::new( + InstrumentationScope::build() + .name("scope".to_string()) + .finish(), + names + .iter() + .map(|&(index, name)| { + Metric::build() + .name(name) + .data_sum(Sum::new( + AggregationTemporality::Cumulative, + true, + vec![ + NumberDataPoint::build() + .time_unix_nano(1000u64 + index as u64) + .value_int(index as i64) + .finish(), + ], + )) + .finish() + }) + .collect::>(), + )], + )]) + } + + #[test] + fn test_filter_include_metric_names() { + let include = MetricMatchProperties::new( + MatchType::Strict, + vec!["test.counter1".into(), "test.counter3".into()], + ); + let filter = MetricFilter::new(Some(include), None); + let mut pool = IdBitmapPool::new(); + + let input = otlp_to_otap(&OtlpProtoMessage::Metrics(build_metrics(&[ + "test.counter1", + "test.counter2", + "test.counter3", + ]))); + + let (result, metrics_consumed, metrics_filtered) = filter.filter(input, &mut pool).unwrap(); + assert_eq!(metrics_consumed, 3); + assert_eq!(metrics_filtered, 1); + + let expected = otlp_to_otap(&OtlpProtoMessage::Metrics(build_metrics_with_indices(&[ + (0, "test.counter1"), + (2, "test.counter3"), + ]))); + + assert_equivalent(&[otap_to_otlp(&result)], &[otap_to_otlp(&expected)]); + } + + #[test] + fn test_filter_exclude_metric_names() { + let exclude = MetricMatchProperties::new(MatchType::Strict, vec!["test.counter2".into()]); + let filter = MetricFilter::new(None, Some(exclude)); + let mut pool = IdBitmapPool::new(); + + let input = otlp_to_otap(&OtlpProtoMessage::Metrics(build_metrics(&[ + "test.counter1", + "test.counter2", + "test.counter3", + ]))); + + let (result, metrics_consumed, metrics_filtered) = filter.filter(input, &mut pool).unwrap(); + assert_eq!(metrics_consumed, 3); + assert_eq!(metrics_filtered, 1); + + let expected = otlp_to_otap(&OtlpProtoMessage::Metrics(build_metrics_with_indices(&[ + (0, "test.counter1"), + (2, "test.counter3"), + ]))); + + assert_equivalent(&[otap_to_otlp(&result)], &[otap_to_otlp(&expected)]); + } + + #[test] + fn test_filter_metric_names_regex() { + let include = MetricMatchProperties::new(MatchType::Regexp, vec![r"^aio_.*_count$".into()]); + let filter = MetricFilter::new(Some(include), None); + let mut pool = IdBitmapPool::new(); + + let input = otlp_to_otap(&OtlpProtoMessage::Metrics(build_metrics(&[ + "aio_akri_count", + "aio_akri_latency", + "other_count", + ]))); + + let (result, metrics_consumed, metrics_filtered) = filter.filter(input, &mut pool).unwrap(); + assert_eq!(metrics_consumed, 3); + assert_eq!(metrics_filtered, 2); + + let expected = otlp_to_otap(&OtlpProtoMessage::Metrics(build_metrics(&[ + "aio_akri_count", + ]))); + + assert_equivalent(&[otap_to_otlp(&result)], &[otap_to_otlp(&expected)]); + } +}