Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion pulse-metrics/src/pipeline/processor/drop/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use pulse_common::proto::yaml_to_proto;
use pulse_protobuf::protos::pulse::config::processor::v1::drop;
use regex::bytes::Regex;
use std::sync::Arc;
use std::time::{Duration, Instant};

// TODO(mattklein123): There are many performance optimizations that we can do as follow ups,
// mainly around drop rules that only use name matches.
Expand Down Expand Up @@ -65,6 +66,7 @@ enum TranslatedDropCondition {
ValueMatch(ValueMatch),
AndMatch(Vec<TranslatedDropCondition>),
NotMatch(Box<TranslatedDropCondition>),
TimestampAgeMatch(Duration),
}

impl TranslatedDropCondition {
Expand Down Expand Up @@ -103,6 +105,9 @@ impl TranslatedDropCondition {
let translated_condition = Self::new(not_match.as_ref())?;
Ok(Self::NotMatch(Box::new(translated_condition)))
},
Condition_type::TimestampAgeMatch(timestamp_age_match) => Ok(Self::TimestampAgeMatch(
Duration::from_secs(timestamp_age_match.max_age_seconds),
)),
}
}

Expand All @@ -127,6 +132,19 @@ impl TranslatedDropCondition {
}
}

fn is_timestamp_age_match(sample: &ParsedMetric, max_age: Duration) -> bool {
let current_timestamp = crate::protos::metric::default_timestamp();
let metric_timestamp = sample.metric().timestamp;

// If the metric timestamp is in the future, don't drop it.
if metric_timestamp > current_timestamp {
return false;
}

let age_seconds = current_timestamp - metric_timestamp;
age_seconds > max_age.as_secs()
}

fn drop_sample(&self, sample: &ParsedMetric) -> bool {
match self {
Self::MetricName(value_match) => value_match.is_match(sample.metric().get_id().name()),
Expand All @@ -144,6 +162,7 @@ impl TranslatedDropCondition {
.iter()
.all(|condition| condition.drop_sample(sample)),
Self::NotMatch(condition) => !condition.drop_sample(sample),
Self::TimestampAgeMatch(max_age) => Self::is_timestamp_age_match(sample, *max_age),
}
}
}
Expand All @@ -157,10 +176,18 @@ struct TranslatedDropRule {
mode: DropMode,
conditions: Vec<TranslatedDropCondition>,
drop_counter: IntCounter,
warn_interval: Option<Duration>,
last_warning_time: parking_lot::Mutex<Option<Instant>>,
}

impl TranslatedDropRule {
fn new(rule: &DropRule, scope: &Scope) -> anyhow::Result<Self> {
let warn_interval = if rule.warn_interval_seconds > 0 {
Some(Duration::from_secs(rule.warn_interval_seconds))
} else {
None
};

Ok(Self {
name: rule.name.to_string(),
mode: rule.mode.enum_value_or_default(),
Expand All @@ -179,6 +206,8 @@ impl TranslatedDropRule {
}
},
),
warn_interval,
last_warning_time: parking_lot::Mutex::new(None),
})
}

Expand All @@ -194,7 +223,25 @@ impl TranslatedDropRule {
.iter()
.any(|condition| condition.drop_sample(sample));
if drop {
log::debug!(
// Determine the log level based on warning interval configuration
let log_level = self
.warn_interval
.map_or(log::Level::Debug, |warn_interval| {
let mut last_warning = self.last_warning_time.lock();
let now = Instant::now();
let should_warn =
last_warning.is_none_or(|last| now.duration_since(last) >= warn_interval);

if should_warn {
*last_warning = Some(now);
log::Level::Warn
} else {
log::Level::Debug
}
});

log::log!(
log_level,
"dropping sample {} for rule {} mode {:?}",
sample.metric(),
self.name,
Expand Down
247 changes: 247 additions & 0 deletions pulse-metrics/src/pipeline/processor/drop/mod_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use drop::drop_rule::{
SimpleValueMatch,
StringMatch,
TagMatch,
TimestampAgeMatch,
ValueMatch,
ValueMatchOperator,
};
Expand Down Expand Up @@ -105,6 +106,16 @@ fn make_and_match(conditions: Vec<DropCondition>) -> DropCondition {
}
}

fn make_timestamp_age_match(max_age_seconds: u64) -> DropCondition {
DropCondition {
condition_type: Some(Condition_type::TimestampAgeMatch(TimestampAgeMatch {
max_age_seconds,
..Default::default()
})),
..Default::default()
}
}

#[tokio::test]
async fn regex_vs_exact() {
let (mut helper, context) = processor_factory_context_for_test();
Expand Down Expand Up @@ -280,3 +291,239 @@ async fn all() {
&labels! { "rule_name" => "rule2", "mode" => "testing" },
);
}

#[tokio::test]
async fn timestamp_age_match() {
let (mut helper, context) = processor_factory_context_for_test();
let processor = Arc::new(
DropProcessor::new(
DropProcessorConfig {
config_source: Some(Config_source::Inline(DropConfig {
rules: vec![DropRule {
name: "rule1".into(),
mode: DropMode::ENABLED.into(),
conditions: vec![make_timestamp_age_match(3600)],
..Default::default()
}],
..Default::default()
})),
..Default::default()
},
context,
)
.await
.unwrap(),
);

let current_time = crate::protos::metric::default_timestamp();
let old_timestamp = current_time - 7200; // 2 hours old
let recent_timestamp = current_time - 1800; // 30 minutes old
let future_timestamp = current_time + 3600; // 1 hour in the future

make_mut(&mut helper.dispatcher)
.expect_send()
.times(1)
.returning(|metrics| {
assert_eq!(metrics.len(), 2);
});
processor
.clone()
.recv_samples(vec![
make_metric("old_metric", &[], old_timestamp),
make_metric("recent_metric", &[], recent_timestamp),
make_metric("future_metric", &[], future_timestamp),
])
.await;
helper.stats_helper.assert_counter_eq(
1,
"processor:dropped",
&labels! { "rule_name" => "rule1", "mode" => "enabled" },
);
}

#[tokio::test]
async fn timestamp_age_match_with_and() {
let (mut helper, context) = processor_factory_context_for_test();
let processor = Arc::new(
DropProcessor::new(
DropProcessorConfig {
config_source: Some(Config_source::Inline(DropConfig {
rules: vec![DropRule {
name: "rule1".into(),
mode: DropMode::ENABLED.into(),
conditions: vec![make_and_match(vec![
make_exact_match("test_metric"),
make_timestamp_age_match(3600),
])],
..Default::default()
}],
..Default::default()
})),
..Default::default()
},
context,
)
.await
.unwrap(),
);

let current_time = crate::protos::metric::default_timestamp();
let old_timestamp = current_time - 7200; // 2 hours old
let recent_timestamp = current_time - 1800; // 30 minutes old

make_mut(&mut helper.dispatcher)
.expect_send()
.times(1)
.returning(|metrics| {
assert_eq!(metrics.len(), 3);
});
processor
.clone()
.recv_samples(vec![
make_metric("test_metric", &[], old_timestamp),
make_metric("test_metric", &[], recent_timestamp),
make_metric("other_metric", &[], old_timestamp),
make_metric("other_metric", &[], recent_timestamp),
])
.await;
helper.stats_helper.assert_counter_eq(
1,
"processor:dropped",
&labels! { "rule_name" => "rule1", "mode" => "enabled" },
);
}

#[tokio::test]
async fn timestamp_age_match_with_not() {
let (mut helper, context) = processor_factory_context_for_test();
let processor = Arc::new(
DropProcessor::new(
DropProcessorConfig {
config_source: Some(Config_source::Inline(DropConfig {
rules: vec![DropRule {
name: "rule1".into(),
mode: DropMode::ENABLED.into(),
conditions: vec![make_not_match(make_timestamp_age_match(3600))],
..Default::default()
}],
..Default::default()
})),
..Default::default()
},
context,
)
.await
.unwrap(),
);

let current_time = crate::protos::metric::default_timestamp();
let old_timestamp = current_time - 7200; // 2 hours old
let recent_timestamp = current_time - 1800; // 30 minutes old

make_mut(&mut helper.dispatcher)
.expect_send()
.times(1)
.returning(|metrics| {
assert_eq!(metrics.len(), 1);
});
processor
.clone()
.recv_samples(vec![
make_metric("old_metric", &[], old_timestamp),
make_metric("recent_metric", &[], recent_timestamp),
])
.await;
helper.stats_helper.assert_counter_eq(
1,
"processor:dropped",
&labels! { "rule_name" => "rule1", "mode" => "enabled" },
);
}

#[tokio::test]
async fn timestamp_age_match_testing_mode() {
let (mut helper, context) = processor_factory_context_for_test();
let processor = Arc::new(
DropProcessor::new(
DropProcessorConfig {
config_source: Some(Config_source::Inline(DropConfig {
rules: vec![DropRule {
name: "rule1".into(),
mode: DropMode::TESTING.into(),
conditions: vec![make_timestamp_age_match(3600)],
..Default::default()
}],
..Default::default()
})),
..Default::default()
},
context,
)
.await
.unwrap(),
);

let current_time = crate::protos::metric::default_timestamp();
let old_timestamp = current_time - 7200; // 2 hours old

make_mut(&mut helper.dispatcher)
.expect_send()
.times(1)
.returning(|metrics| {
// In testing mode, nothing should be dropped
assert_eq!(metrics.len(), 1);
});
processor
.clone()
.recv_samples(vec![make_metric("old_metric", &[], old_timestamp)])
.await;
helper.stats_helper.assert_counter_eq(
1,
"processor:dropped",
&labels! { "rule_name" => "rule1", "mode" => "testing" },
);
}

#[tokio::test]
async fn warn_interval() {
let (mut helper, context) = processor_factory_context_for_test();
let processor = Arc::new(
DropProcessor::new(
DropProcessorConfig {
config_source: Some(Config_source::Inline(DropConfig {
rules: vec![DropRule {
name: "rule1".into(),
mode: DropMode::ENABLED.into(),
conditions: vec![make_exact_match("drop_this")],
warn_interval_seconds: 60,
..Default::default()
}],
..Default::default()
})),
..Default::default()
},
context,
)
.await
.unwrap(),
);

make_mut(&mut helper.dispatcher)
.expect_send()
.times(1)
.returning(|metrics| {
assert_eq!(metrics.len(), 1);
});
processor
.clone()
.recv_samples(vec![
make_metric("drop_this", &[], 0),
make_metric("keep_this", &[], 0),
])
.await;
helper.stats_helper.assert_counter_eq(
1,
"processor:dropped",
&labels! { "rule_name" => "rule1", "mode" => "enabled" },
);
}
Loading
Loading