-
Notifications
You must be signed in to change notification settings - Fork 188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add MaximumOverIntervalGauge #469
Open
JanBerktold
wants to merge
2
commits into
tikv:master
Choose a base branch
from
JanBerktold:add-maximum-in-interval-counter
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,212 @@ | ||
use parking_lot::{RwLock, RwLockUpgradableReadGuard}; | ||
use std::{ | ||
sync::{ | ||
atomic::{AtomicI64, Ordering}, | ||
Arc, | ||
}, | ||
time::Duration, | ||
}; | ||
|
||
#[cfg(test)] | ||
use mock_instant::Instant; | ||
|
||
#[cfg(not(test))] | ||
use std::time::Instant; | ||
|
||
use crate::{core::Collector, Error, PullingGauge}; | ||
|
||
/// A prometheus gauge that exposes the maximum value of a gauge over an interval. | ||
/// | ||
/// Used to expose instantaneous values that tend to move a lot within a small interval. | ||
/// | ||
/// # Examples | ||
/// ``` | ||
/// # use std::time::Duration; | ||
/// # use prometheus::{Registry, MaximumOverIntervalGauge}; | ||
/// | ||
/// let registry = Registry::new(); | ||
/// let gauge = MaximumOverIntervalGauge::new( | ||
/// "maximum_queue_size_30s", | ||
/// "The high watermark queue size in the last 30 seconds.", | ||
/// Duration::from_secs(30) | ||
/// ).unwrap(); | ||
/// registry.register(Box::new(gauge.clone())); | ||
/// | ||
/// gauge.add(30); | ||
/// gauge.sub(10); | ||
/// | ||
/// // For the next 30 seconds, the metric will be 30 as that was the maximum value. | ||
/// // Afterwards, it will drop to 10. | ||
/// ``` | ||
#[derive(Clone, Debug)] | ||
pub struct MaximumOverIntervalGauge { | ||
// The current real-time value. | ||
value: Arc<AtomicI64>, | ||
// The maximum value in the current interval. | ||
maximum_value: Arc<AtomicI64>, | ||
|
||
// The length of a given interval. | ||
interval_duration: Duration, | ||
// The time at which the current interval will expose. | ||
interval_expiry: Arc<RwLock<Instant>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We actually use an |
||
|
||
gauge: PullingGauge, | ||
} | ||
|
||
impl MaximumOverIntervalGauge { | ||
/// Create a new [`MaximumOverIntervalGauge`]. | ||
pub fn new<S1: Into<String>, S2: Into<String>>( | ||
name: S1, | ||
help: S2, | ||
interval: Duration, | ||
) -> Result<Self, Error> { | ||
let maximum_value = Arc::new(AtomicI64::new(0)); | ||
|
||
Ok(Self { | ||
value: Arc::new(AtomicI64::new(0)), | ||
maximum_value: maximum_value.clone(), | ||
|
||
interval_expiry: Arc::new(RwLock::new(Instant::now() + interval)), | ||
interval_duration: interval, | ||
gauge: PullingGauge::new( | ||
name, | ||
help, | ||
Box::new(move || maximum_value.load(Ordering::Relaxed) as f64), | ||
)?, | ||
}) | ||
} | ||
|
||
/// Increments the gauge by 1. | ||
pub fn inc(&self) { | ||
self.apply_delta(1); | ||
} | ||
|
||
/// Decrements the gauge by 1. | ||
pub fn dec(&self) { | ||
self.apply_delta(-1); | ||
} | ||
|
||
/// Add the given value to the gauge. | ||
/// | ||
/// (The value can be negative, resulting in a decrement of the gauge.) | ||
pub fn add(&self, v: i64) { | ||
self.apply_delta(v); | ||
} | ||
|
||
/// Subtract the given value from the gauge. | ||
/// | ||
/// (The value can be negative, resulting in an increment of the gauge.) | ||
pub fn sub(&self, v: i64) { | ||
self.apply_delta(-v); | ||
} | ||
|
||
fn apply_delta(&self, delta: i64) { | ||
let previous_value = self.value.fetch_add(delta, Ordering::Relaxed); | ||
let new_value = previous_value + delta; | ||
|
||
let now = Instant::now(); | ||
let interval_expiry = self.interval_expiry.upgradable_read(); | ||
let loaded_interval_expiry = *interval_expiry; | ||
|
||
// Check whether we've crossed into the new interval. | ||
if loaded_interval_expiry < now { | ||
// There's a possible optimization here of using try_upgrade in a loop. Need to write | ||
// benchmarks to verify. | ||
let mut interval_expiry = RwLockUpgradableReadGuard::upgrade(interval_expiry); | ||
|
||
// Did we get to be the thread that actually started the new interval? Other threads | ||
// could have updated the value before we got the exclusive lock. | ||
if *interval_expiry == loaded_interval_expiry { | ||
*interval_expiry = now + self.interval_duration; | ||
self.maximum_value.store(new_value, Ordering::Relaxed); | ||
|
||
return; | ||
} | ||
} | ||
|
||
// Set the maximum_value to the max of the current value & previous max. | ||
self.maximum_value.fetch_max(new_value, Ordering::Relaxed); | ||
} | ||
} | ||
|
||
impl Collector for MaximumOverIntervalGauge { | ||
fn desc(&self) -> Vec<&crate::core::Desc> { | ||
self.gauge.desc() | ||
} | ||
|
||
fn collect(&self) -> Vec<crate::proto::MetricFamily> { | ||
// Apply a delta of '0' to ensure that the reset-value-if-interval-expired-logic kicks in. | ||
self.apply_delta(0); | ||
|
||
self.gauge.collect() | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod test { | ||
use mock_instant::MockClock; | ||
|
||
use super::*; | ||
|
||
static INTERVAL: Duration = Duration::from_secs(30); | ||
|
||
#[test] | ||
fn test_correct_behaviour() { | ||
let gauge = MaximumOverIntervalGauge::new( | ||
"test_counter".to_string(), | ||
"This won't help you".to_string(), | ||
INTERVAL, | ||
) | ||
.unwrap(); | ||
|
||
assert_metric_value(&gauge, 0.0); | ||
|
||
gauge.add(5); | ||
|
||
assert_metric_value(&gauge, 5.0); | ||
|
||
gauge.dec(); | ||
|
||
// The value should still be five after we decreased it as the max within the interval was 5. | ||
assert_metric_value(&gauge, 5.0); | ||
|
||
MockClock::advance(INTERVAL + Duration::from_secs(1)); | ||
|
||
// The value should be 4 now as the next interval has started. | ||
assert_metric_value(&gauge, 4.0); | ||
} | ||
|
||
#[test] | ||
fn test_cloning() { | ||
let gauge = MaximumOverIntervalGauge::new( | ||
"test_counter".to_string(), | ||
"This won't help you".to_string(), | ||
INTERVAL, | ||
) | ||
.unwrap(); | ||
|
||
let same_gauge = gauge.clone(); | ||
|
||
assert_metric_value(&gauge, 0.0); | ||
|
||
gauge.add(5); | ||
|
||
// Read from the cloned gauge to veriy that they share data. | ||
assert_metric_value(&same_gauge, 5.0); | ||
} | ||
|
||
fn assert_metric_value(gauge: &MaximumOverIntervalGauge, val: f64) { | ||
let result = gauge.collect(); | ||
|
||
let metric_family = result | ||
.first() | ||
.expect("expected one MetricFamily to be returned"); | ||
|
||
let metric = metric_family | ||
.get_metric() | ||
.first() | ||
.expect("expected one Metric to be returned"); | ||
|
||
assert_eq!(val, metric.get_gauge().get_value()); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
use std::{collections::HashMap, fmt, sync::Arc}; | ||
|
||
use crate::{ | ||
core::Collector, | ||
proto::{Gauge, Metric, MetricFamily, MetricType}, | ||
}; | ||
use protobuf::RepeatedField; | ||
|
||
/// A [Gauge] that returns the value from a provided function on every collect run. | ||
/// | ||
/// This metric is the equivalant of Go's | ||
/// <https://pkg.go.dev/github.com/prometheus/[email protected]/prometheus#GaugeFunc> | ||
/// | ||
/// # Examples | ||
/// ``` | ||
/// # use prometheus::{Registry, PullingGauge}; | ||
/// # // We are stubbing out std::thread::available_parallelism since it's not available in the | ||
/// # // oldest Rust version that we support. | ||
/// # fn available_parallelism() -> f64 { 0.0 } | ||
/// | ||
/// let registry = Registry::new(); | ||
/// let gauge = PullingGauge::new( | ||
/// "available_parallelism", | ||
/// "The available parallelism, usually the numbers of logical cores.", | ||
/// Box::new(|| available_parallelism()) | ||
/// ).unwrap(); | ||
/// registry.register(Box::new(gauge)); | ||
/// ``` | ||
#[derive(Clone)] | ||
pub struct PullingGauge { | ||
desc: crate::core::Desc, | ||
value: Arc<Box<dyn Fn() -> f64 + Send + Sync>>, | ||
} | ||
|
||
impl fmt::Debug for PullingGauge { | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
f.debug_struct("PullingGauge") | ||
.field("desc", &self.desc) | ||
.field("value", &"<opaque>") | ||
.finish() | ||
} | ||
} | ||
|
||
impl PullingGauge { | ||
/// Create a new [`PullingGauge`]. | ||
pub fn new<S1: Into<String>, S2: Into<String>>( | ||
name: S1, | ||
help: S2, | ||
value: Box<dyn Fn() -> f64 + Send + Sync>, | ||
) -> crate::Result<Self> { | ||
Ok(PullingGauge { | ||
value: Arc::new(value), | ||
desc: crate::core::Desc::new(name.into(), help.into(), Vec::new(), HashMap::new())?, | ||
}) | ||
} | ||
|
||
fn metric(&self) -> Metric { | ||
let mut gauge = Gauge::default(); | ||
let getter = &self.value; | ||
gauge.set_value(getter()); | ||
|
||
let mut metric = Metric::default(); | ||
metric.set_gauge(gauge); | ||
|
||
metric | ||
} | ||
} | ||
|
||
impl Collector for PullingGauge { | ||
fn desc(&self) -> Vec<&crate::core::Desc> { | ||
vec![&self.desc] | ||
} | ||
|
||
fn collect(&self) -> Vec<crate::proto::MetricFamily> { | ||
let mut m = MetricFamily::default(); | ||
m.set_name(self.desc.fq_name.clone()); | ||
m.set_help(self.desc.help.clone()); | ||
m.set_field_type(MetricType::GAUGE); | ||
m.set_metric(RepeatedField::from_vec(vec![self.metric()])); | ||
vec![m] | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
use crate::metrics::Collector; | ||
|
||
#[test] | ||
fn test_pulling_gauge() { | ||
const VALUE: f64 = 10.0; | ||
|
||
let gauge = | ||
PullingGauge::new("test_gauge", "Purely for testing", Box::new(|| VALUE)).unwrap(); | ||
|
||
let metrics = gauge.collect(); | ||
assert_eq!(metrics.len(), 1); | ||
|
||
assert_eq!(VALUE, metrics[0].get_metric()[0].get_gauge().get_value()); | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and everywhere else, gauges typically operate on
f64
values.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was purely an i64 because we weren't clear on how to implement an atomic f64 (nor did we care enough). I just discovered your awesome https://github.com/tikv/rust-prometheus/blob/master/src/atomic64.rs#L89 setup -- will switch this over!