Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Perf][Metrics] Use flurry's concurrent hashmap for 5x throughput #2305

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ tokio = { workspace = true, features = ["rt", "time"], optional = true }
tokio-stream = { workspace = true, optional = true }
http = { workspace = true, optional = true }
tracing = {workspace = true, optional = true}
flurry = "0.5.1"

[package.metadata.docs.rs]
all-features = true
Expand Down
75 changes: 37 additions & 38 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
mod sum;

use core::fmt;
use std::collections::{HashMap, HashSet};
use std::mem::take;
use std::ops::{Add, AddAssign, DerefMut, Sub};
use std::collections::HashSet;
use std::ops::{Add, AddAssign, Sub};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::sync::Arc;

use aggregate::is_under_cardinality_limit;
pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
Expand Down Expand Up @@ -49,10 +48,12 @@
/// updates to the underlying value trackers should be performed.
pub(crate) struct ValueMap<A>
where
A: Aggregator,
A: Aggregator + Send + Sync,
{
/// Trackers store the values associated with different attribute sets.
trackers: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,
trackers: flurry::HashMap<Vec<KeyValue>, Arc<A>>,
/// Lock to ensure that only one writer can write to the `trackers` map at a time.
write_lock: std::sync::Mutex<()>,
/// Number of different attribute set stored in the `trackers` map.
count: AtomicUsize,
/// Indicates whether a value with no attributes has been stored.
Expand All @@ -65,11 +66,12 @@

impl<A> ValueMap<A>
where
A: Aggregator,
A: Aggregator + Send + Sync,
{
fn new(config: A::InitConfig) -> Self {
ValueMap {
trackers: RwLock::new(HashMap::new()),
trackers: flurry::HashMap::new(),
write_lock: std::sync::Mutex::new(()),
has_no_attribute_value: AtomicBool::new(false),
no_attribute_tracker: A::create(&config),
count: AtomicUsize::new(0),
Expand All @@ -84,51 +86,46 @@
return;
}

let Ok(trackers) = self.trackers.read() else {
return;
};

// Try to retrieve and update the tracker with the attributes in the provided order first
if let Some(tracker) = trackers.get(attributes) {
let hashmap_ref = self.trackers.pin();
if let Some(tracker) = hashmap_ref.get(attributes) {
tracker.update(value);
return;
}

// Try to retrieve and update the tracker with the attributes sorted.
let sorted_attrs = AttributeSet::from(attributes).into_vec();
if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
if let Some(tracker) = hashmap_ref.get(sorted_attrs.as_slice()) {
tracker.update(value);
return;
}

// Give up the read lock before acquiring the write lock.
drop(trackers);

let Ok(mut trackers) = self.trackers.write() else {
let Ok(_write_lock) = self.write_lock.lock() else {
return;
};

// Recheck both the provided and sorted orders after acquiring the write lock
// in case another thread has pushed an update in the meantime.
if let Some(tracker) = trackers.get(attributes) {
if let Some(tracker) = hashmap_ref.get(attributes) {
tracker.update(value);
} else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
} else if let Some(tracker) = hashmap_ref.get(sorted_attrs.as_slice()) {
tracker.update(value);
} else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
let new_tracker = Arc::new(A::create(&self.config));
new_tracker.update(value);

// Insert tracker with the attributes in the provided and sorted orders
trackers.insert(attributes.to_vec(), new_tracker.clone());
trackers.insert(sorted_attrs, new_tracker);
hashmap_ref.insert(attributes.to_vec(), new_tracker.clone());
hashmap_ref.insert(sorted_attrs, new_tracker);

self.count.fetch_add(1, Ordering::SeqCst);
} else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) {
} else if let Some(overflow_value) = hashmap_ref.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice())
{
overflow_value.update(value);
} else {
let new_tracker = A::create(&self.config);
new_tracker.update(value);
trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker));
hashmap_ref.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker));
otel_warn!( name: "ValueMap.measure",
message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged."
);
Expand All @@ -146,12 +143,9 @@
dest.push(map_fn(vec![], &self.no_attribute_tracker));
}

let Ok(trackers) = self.trackers.read() else {
return;
};

let mut seen = HashSet::new();
for (attrs, tracker) in trackers.iter() {
let hashmap_ref = self.trackers.pin();
for (attrs, tracker) in hashmap_ref.iter() {
if seen.insert(Arc::as_ptr(tracker)) {
dest.push(map_fn(attrs.clone(), tracker));
}
Expand All @@ -172,18 +166,23 @@
));
}

let trackers = match self.trackers.write() {
Ok(mut trackers) => {
self.count.store(0, Ordering::SeqCst);
take(trackers.deref_mut())
}
Err(_) => todo!(),
let Ok(_write_lock) = self.write_lock.lock() else {
return;

Check warning on line 170 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L170

Added line #L170 was not covered by tests
};

self.count.store(0, Ordering::SeqCst);
let trackers = self.trackers.clone();
self.trackers.pin().clear();

drop(_write_lock);

let mut seen = HashSet::new();
for (attrs, tracker) in trackers.into_iter() {
if seen.insert(Arc::as_ptr(&tracker)) {
dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config)));
for (attrs, tracker) in trackers.pin().into_iter() {
if seen.insert(Arc::as_ptr(tracker)) {
dest.push(map_fn(
attrs.to_vec(),
tracker.clone_and_reset(&self.config),
));
}
}
}
Expand Down
47 changes: 45 additions & 2 deletions opentelemetry/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::borrow::{Borrow, Cow};
use std::cmp::Ordering;
use std::sync::Arc;
use std::{fmt, hash};

Expand Down Expand Up @@ -163,6 +164,26 @@
String(Vec<StringValue>),
}

impl Eq for Array {}

impl PartialOrd for Array {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}

Check warning on line 172 in opentelemetry/src/common.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/common.rs#L170-L172

Added lines #L170 - L172 were not covered by tests
}

impl Ord for Array {
fn cmp(&self, other: &Self) -> Ordering {
match (self, other) {
(Array::Bool(a), Array::Bool(b)) => a.cmp(b),
(Array::I64(a), Array::I64(b)) => a.cmp(b),
(Array::F64(a), Array::F64(b)) => a.partial_cmp(b).unwrap(),
(Array::String(a), Array::String(b)) => a.cmp(b),
(_a, _b) => todo!(),

Check warning on line 182 in opentelemetry/src/common.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/common.rs#L176-L182

Added lines #L176 - L182 were not covered by tests
}
}

Check warning on line 184 in opentelemetry/src/common.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/common.rs#L184

Added line #L184 was not covered by tests
}

impl fmt::Display for Array {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Expand Down Expand Up @@ -229,9 +250,31 @@
Array(Array),
}

impl Eq for Value {}

impl PartialOrd for Value {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}

Check warning on line 258 in opentelemetry/src/common.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/common.rs#L256-L258

Added lines #L256 - L258 were not covered by tests
}

impl Ord for Value {
fn cmp(&self, other: &Self) -> Ordering {
match (self, other) {
(Value::Bool(a), Value::Bool(b)) => a.cmp(b),
(Value::I64(a), Value::I64(b)) => a.cmp(b),
(Value::F64(a), Value::F64(b)) => a.partial_cmp(b).unwrap(),
(Value::String(a), Value::String(b)) => a.as_str().cmp(b.as_str()),
(Value::Array(a), Value::Array(b)) => a.cmp(b),

Check warning on line 268 in opentelemetry/src/common.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/common.rs#L262-L268

Added lines #L262 - L268 were not covered by tests
// (a, b) => a.as_str().cmp(&b.as_str()),
(_a, _b) => todo!(),

Check warning on line 270 in opentelemetry/src/common.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/common.rs#L270

Added line #L270 was not covered by tests
}
}

Check warning on line 272 in opentelemetry/src/common.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/common.rs#L272

Added line #L272 was not covered by tests
}

/// Wrapper for string-like values
#[non_exhaustive]
#[derive(Clone, PartialEq, Eq, Hash)]
#[derive(Clone, PartialEq, PartialOrd, Ord, Eq, Hash)]
pub struct StringValue(OtelString);

impl fmt::Debug for StringValue {
Expand Down Expand Up @@ -375,7 +418,7 @@
}

/// A key-value pair describing an attribute.
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialOrd, Ord, PartialEq)]
#[non_exhaustive]
pub struct KeyValue {
/// The attribute name
Expand Down
Loading