diff --git a/Cargo.toml b/Cargo.toml index e2197605..629c1ec9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ procfs = { version = "^0.14", optional = true, default-features = false } criterion = "0.4" getopts = "^0.2" hyper = { version = "^0.14", features = ["server", "http1", "tcp"] } +mock_instant = { version = "0.2", features = ["sync"] } tokio = { version = "^1.0", features = ["macros", "rt-multi-thread"] } [build-dependencies] diff --git a/src/lib.rs b/src/lib.rs index 7991d8ff..df3cfb1e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -157,7 +157,9 @@ mod encoder; mod errors; mod gauge; mod histogram; +mod maximum_over_interval_gauge; mod metrics; +mod pulling_gauge; #[cfg(feature = "push")] mod push; mod registry; @@ -218,7 +220,9 @@ pub use self::gauge::{Gauge, GaugeVec, IntGauge, IntGaugeVec}; pub use self::histogram::DEFAULT_BUCKETS; pub use self::histogram::{exponential_buckets, linear_buckets}; pub use self::histogram::{Histogram, HistogramOpts, HistogramTimer, HistogramVec}; +pub use self::maximum_over_interval_gauge::MaximumOverIntervalGauge; pub use self::metrics::Opts; +pub use self::pulling_gauge::PullingGauge; #[cfg(feature = "push")] pub use self::push::{ hostname_grouping_key, push_add_collector, push_add_metrics, push_collector, push_metrics, diff --git a/src/maximum_over_interval_gauge.rs b/src/maximum_over_interval_gauge.rs new file mode 100644 index 00000000..26ee3793 --- /dev/null +++ b/src/maximum_over_interval_gauge.rs @@ -0,0 +1,212 @@ +use parking_lot::{RwLock, RwLockUpgradableReadGuard}; +use std::{ + sync::{ + atomic::{AtomicI64, Ordering}, + Arc, + }, + time::Duration, +}; + +#[cfg(test)] +use mock_instant::Instant; + +#[cfg(not(test))] +use std::time::Instant; + +use crate::{core::Collector, Error, PullingGauge}; + +/// A prometheus gauge that exposes the maximum value of a gauge over an interval. +/// +/// Used to expose instantaneous values that tend to move a lot within a small interval. +/// +/// # Examples +/// ``` +/// # use std::time::Duration; +/// # use prometheus::{Registry, MaximumOverIntervalGauge}; +/// +/// let registry = Registry::new(); +/// let gauge = MaximumOverIntervalGauge::new( +/// "maximum_queue_size_30s", +/// "The high watermark queue size in the last 30 seconds.", +/// Duration::from_secs(30) +/// ).unwrap(); +/// registry.register(Box::new(gauge.clone())); +/// +/// gauge.add(30); +/// gauge.sub(10); +/// +/// // For the next 30 seconds, the metric will be 30 as that was the maximum value. +/// // Afterwards, it will drop to 10. +/// ``` +#[derive(Clone, Debug)] +pub struct MaximumOverIntervalGauge { + // The current real-time value. + value: Arc, + // The maximum value in the current interval. + maximum_value: Arc, + + // The length of a given interval. + interval_duration: Duration, + // The time at which the current interval will expose. + interval_expiry: Arc>, + + gauge: PullingGauge, +} + +impl MaximumOverIntervalGauge { + /// Create a new [`MaximumOverIntervalGauge`]. + pub fn new, S2: Into>( + name: S1, + help: S2, + interval: Duration, + ) -> Result { + let maximum_value = Arc::new(AtomicI64::new(0)); + + Ok(Self { + value: Arc::new(AtomicI64::new(0)), + maximum_value: maximum_value.clone(), + + interval_expiry: Arc::new(RwLock::new(Instant::now() + interval)), + interval_duration: interval, + gauge: PullingGauge::new( + name, + help, + Box::new(move || maximum_value.load(Ordering::Relaxed) as f64), + )?, + }) + } + + /// Increments the gauge by 1. + pub fn inc(&self) { + self.apply_delta(1); + } + + /// Decrements the gauge by 1. + pub fn dec(&self) { + self.apply_delta(-1); + } + + /// Add the given value to the gauge. + /// + /// (The value can be negative, resulting in a decrement of the gauge.) + pub fn add(&self, v: i64) { + self.apply_delta(v); + } + + /// Subtract the given value from the gauge. + /// + /// (The value can be negative, resulting in an increment of the gauge.) + pub fn sub(&self, v: i64) { + self.apply_delta(-v); + } + + fn apply_delta(&self, delta: i64) { + let previous_value = self.value.fetch_add(delta, Ordering::Relaxed); + let new_value = previous_value + delta; + + let now = Instant::now(); + let interval_expiry = self.interval_expiry.upgradable_read(); + let loaded_interval_expiry = *interval_expiry; + + // Check whether we've crossed into the new interval. + if loaded_interval_expiry < now { + // There's a possible optimization here of using try_upgrade in a loop. Need to write + // benchmarks to verify. + let mut interval_expiry = RwLockUpgradableReadGuard::upgrade(interval_expiry); + + // Did we get to be the thread that actually started the new interval? Other threads + // could have updated the value before we got the exclusive lock. + if *interval_expiry == loaded_interval_expiry { + *interval_expiry = now + self.interval_duration; + self.maximum_value.store(new_value, Ordering::Relaxed); + + return; + } + } + + // Set the maximum_value to the max of the current value & previous max. + self.maximum_value.fetch_max(new_value, Ordering::Relaxed); + } +} + +impl Collector for MaximumOverIntervalGauge { + fn desc(&self) -> Vec<&crate::core::Desc> { + self.gauge.desc() + } + + fn collect(&self) -> Vec { + // Apply a delta of '0' to ensure that the reset-value-if-interval-expired-logic kicks in. + self.apply_delta(0); + + self.gauge.collect() + } +} + +#[cfg(test)] +mod test { + use mock_instant::MockClock; + + use super::*; + + static INTERVAL: Duration = Duration::from_secs(30); + + #[test] + fn test_correct_behaviour() { + let gauge = MaximumOverIntervalGauge::new( + "test_counter".to_string(), + "This won't help you".to_string(), + INTERVAL, + ) + .unwrap(); + + assert_metric_value(&gauge, 0.0); + + gauge.add(5); + + assert_metric_value(&gauge, 5.0); + + gauge.dec(); + + // The value should still be five after we decreased it as the max within the interval was 5. + assert_metric_value(&gauge, 5.0); + + MockClock::advance(INTERVAL + Duration::from_secs(1)); + + // The value should be 4 now as the next interval has started. + assert_metric_value(&gauge, 4.0); + } + + #[test] + fn test_cloning() { + let gauge = MaximumOverIntervalGauge::new( + "test_counter".to_string(), + "This won't help you".to_string(), + INTERVAL, + ) + .unwrap(); + + let same_gauge = gauge.clone(); + + assert_metric_value(&gauge, 0.0); + + gauge.add(5); + + // Read from the cloned gauge to veriy that they share data. + assert_metric_value(&same_gauge, 5.0); + } + + fn assert_metric_value(gauge: &MaximumOverIntervalGauge, val: f64) { + let result = gauge.collect(); + + let metric_family = result + .first() + .expect("expected one MetricFamily to be returned"); + + let metric = metric_family + .get_metric() + .first() + .expect("expected one Metric to be returned"); + + assert_eq!(val, metric.get_gauge().get_value()); + } +} diff --git a/src/pulling_gauge.rs b/src/pulling_gauge.rs new file mode 100644 index 00000000..6fbe9633 --- /dev/null +++ b/src/pulling_gauge.rs @@ -0,0 +1,101 @@ +use std::{collections::HashMap, fmt, sync::Arc}; + +use crate::{ + core::Collector, + proto::{Gauge, Metric, MetricFamily, MetricType}, +}; +use protobuf::RepeatedField; + +/// A [Gauge] that returns the value from a provided function on every collect run. +/// +/// This metric is the equivalant of Go's +/// +/// +/// # Examples +/// ``` +/// # use prometheus::{Registry, PullingGauge}; +/// # // We are stubbing out std::thread::available_parallelism since it's not available in the +/// # // oldest Rust version that we support. +/// # fn available_parallelism() -> f64 { 0.0 } +/// +/// let registry = Registry::new(); +/// let gauge = PullingGauge::new( +/// "available_parallelism", +/// "The available parallelism, usually the numbers of logical cores.", +/// Box::new(|| available_parallelism()) +/// ).unwrap(); +/// registry.register(Box::new(gauge)); +/// ``` +#[derive(Clone)] +pub struct PullingGauge { + desc: crate::core::Desc, + value: Arc f64 + Send + Sync>>, +} + +impl fmt::Debug for PullingGauge { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PullingGauge") + .field("desc", &self.desc) + .field("value", &"") + .finish() + } +} + +impl PullingGauge { + /// Create a new [`PullingGauge`]. + pub fn new, S2: Into>( + name: S1, + help: S2, + value: Box f64 + Send + Sync>, + ) -> crate::Result { + Ok(PullingGauge { + value: Arc::new(value), + desc: crate::core::Desc::new(name.into(), help.into(), Vec::new(), HashMap::new())?, + }) + } + + fn metric(&self) -> Metric { + let mut gauge = Gauge::default(); + let getter = &self.value; + gauge.set_value(getter()); + + let mut metric = Metric::default(); + metric.set_gauge(gauge); + + metric + } +} + +impl Collector for PullingGauge { + fn desc(&self) -> Vec<&crate::core::Desc> { + vec![&self.desc] + } + + fn collect(&self) -> Vec { + let mut m = MetricFamily::default(); + m.set_name(self.desc.fq_name.clone()); + m.set_help(self.desc.help.clone()); + m.set_field_type(MetricType::GAUGE); + m.set_metric(RepeatedField::from_vec(vec![self.metric()])); + vec![m] + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metrics::Collector; + + #[test] + fn test_pulling_gauge() { + const VALUE: f64 = 10.0; + + let gauge = + PullingGauge::new("test_gauge", "Purely for testing", Box::new(|| VALUE)).unwrap(); + + let metrics = gauge.collect(); + assert_eq!(metrics.len(), 1); + + assert_eq!(VALUE, metrics[0].get_metric()[0].get_gauge().get_value()); + } +}