Skip to content

Commit

Permalink
replace HashMaps & fix histogram ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
Llandy3d committed Feb 25, 2024
1 parent efbe075 commit 899f324
Show file tree
Hide file tree
Showing 2 changed files with 243 additions and 20 deletions.
17 changes: 8 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use pyo3::intern;
use pyo3::prelude::*;
use pyo3::types::{PyDict, PyType};
use redis::{from_redis_value, ConnectionLike, FromRedisValue, RedisResult, Value};
use std::collections::{BTreeMap, HashMap};
use std::collections::BTreeMap;
use std::sync::atomic::Ordering;
use std::sync::{mpsc, Mutex, OnceLock};
use std::thread;
Expand Down Expand Up @@ -145,14 +145,14 @@ fn add_job_to_pipeline(received: RedisJob, pipe: &mut redis::Pipeline) {
#[derive(Debug)]
enum PipelineResult {
Float(f64),
Hash(HashMap<String, String>),
Hash(BTreeMap<String, String>),
}

impl FromRedisValue for PipelineResult {
fn from_redis_value(v: &Value) -> RedisResult<Self> {
let result = match v {
Value::Bulk(_) => {
let map: HashMap<String, String> = from_redis_value(v)?;
let map: BTreeMap<String, String> = from_redis_value(v)?;
PipelineResult::Hash(map)
}
_ => {
Expand Down Expand Up @@ -483,7 +483,7 @@ impl RedisBackend {
}

PipelineResult::Hash(hash) => {
let mut ordered_samples = HashMap::new();
let mut ordered_samples = BTreeMap::new();
let count_hash = hash;
current_value = values_iterator.next().unwrap();
let sum_hash = {
Expand Down Expand Up @@ -610,7 +610,7 @@ impl RedisBackend {
.map(|bound| bound.as_str())
.chain(extra_suffixes);

let mut ordered_samples = HashMap::new();
let mut ordered_samples = BTreeMap::new();

for suffix in suffixes {
let mut hash = hash;
Expand Down Expand Up @@ -694,10 +694,9 @@ impl RedisBackend {
}
}
}

for ordered_samples_list in ordered_samples.values_mut() {
samples_list.append(ordered_samples_list);
}
}
for ordered_samples_list in ordered_samples.values_mut() {
samples_list.append(ordered_samples_list);
}
}
},
Expand Down
246 changes: 235 additions & 11 deletions tests/test_redis.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,34 @@
import time
import pytest

from concurrent.futures import ProcessPoolExecutor
from pytest_redis import factories
from pytheus.backends import load_backend
from pytheus.metrics import Counter, Histogram, Gauge, Summary
from pytheus.metrics import Counter, Histogram, Gauge, Summary, Sample
from pytheus.registry import CollectorRegistry
from pytheus_backend_rs import RedisBackend
from pytheus.exposition import generate_metrics


redis_server = factories.redis_proc(port=9000)
redis_client = factories.redisdb('redis_server', decode=True)
# when running a single test might not be picking up the backend so
# we enforce it
@pytest.fixture(autouse=True)
def load_redis_backend():
load_backend(RedisBackend, {"host": "localhost", "port": 6379})


def test_smoke(redis_client):
load_backend(RedisBackend, {"host": "localhost", "port": 9000})
import redis

redis_client = redis.Redis(**{"host": "localhost", "port": 6379}, decode_responses=True)

# automatically clear the cache after every test function
@pytest.fixture(autouse=True)
# def clear_redis(redis_client):
def clear_redis():
redis_client.flushall()


def test_smoke():
load_backend(RedisBackend, {"host": "localhost", "port": 6379})
counter = Counter("smoke", "smoke")
counter.inc()
time.sleep(0.01)
Expand All @@ -37,7 +51,7 @@ def test_create_backend_labeled():

assert backend.key_name == counter.name
assert backend.histogram_bucket is None
assert backend.labels_hash == "cat"
assert backend.labels_hash == '{"bob":"cat"}'


def test_create_backend_labeled_with_default():
Expand All @@ -46,7 +60,7 @@ def test_create_backend_labeled_with_default():

assert backend.key_name == counter.name
assert backend.histogram_bucket is None
assert backend.labels_hash == "cat"
assert backend.labels_hash == '{"bob":"cat"}'


def test_create_backend_labeled_with_default_mixed():
Expand All @@ -58,7 +72,7 @@ def test_create_backend_labeled_with_default_mixed():

assert backend.key_name == counter.name
assert backend.histogram_bucket is None
assert backend.labels_hash == "cat-fish"
assert backend.labels_hash == '{"bob":"cat","bobby":"fish"}'


def test_create_backend_with_histogram_bucket():
Expand All @@ -71,7 +85,7 @@ def test_create_backend_with_histogram_bucket():
assert backend.labels_hash is None


def test_multiple_metrics_with_same_name_with_redis_overlap(redis_client):
def test_multiple_metrics_with_same_name_with_redis_overlap():
"""
If sharing the same database, single value metrics will be overlapping.
"""
Expand Down Expand Up @@ -112,7 +126,7 @@ def test_multiple_metrics_with_same_name_labeled_with_redis_do_not_overlap():
assert second_collector_metrics_count == 1


def test_multiple_metrics_with_same_name_labeled_with_redis_do_overlap_on_shared_child(redis_client):
def test_multiple_metrics_with_same_name_labeled_with_redis_do_overlap_on_shared_child():
"""
If sharing the same database, labeled metrics will be returned from collectors if having the
same child instance.
Expand Down Expand Up @@ -159,6 +173,7 @@ def test_generate_samples_with_labels():
)
counter.labels({"bob": "a"})
counter.labels({"bob": "b"})
time.sleep(0.1)
samples = RedisBackend._generate_samples(registry)
assert len(samples[counter._collector]) == 3

Expand All @@ -179,6 +194,7 @@ def _run_multiprocess(extra_label):
gauge.labels(bob="observable_only_on_one").inc(2.7)
summary.labels(bob="observable_only_on_one").observe(2.7)
histogram.labels(bob="observable_only_on_one").observe(2.7)
time.sleep(0.1)
return generate_metrics(registry)


Expand All @@ -194,3 +210,211 @@ def test_multiple_return_all_metrics_entries():
second_result = second_result.result()

assert first_result == second_result


class TestGenerateSamples:
def test_counter(self):
registry = CollectorRegistry()
counter = Counter("counter", "desc", registry=registry)

time.sleep(0.1)
metrics_output = generate_metrics(registry)
assert metrics_output == (
"# HELP counter desc\n"
"# TYPE counter counter\n"
'counter 0.0\n'
)

def test_counter_labeled(self):
registry = CollectorRegistry()
counter = Counter("counter", "desc", required_labels=["bob"], registry=registry)
counter.labels(bob="cat").inc(2.7)

time.sleep(0.1)
metrics_output = generate_metrics(registry)
assert metrics_output == (
"# HELP counter desc\n"
"# TYPE counter counter\n"
'counter{bob="cat"} 2.7\n'
)

def test_gauge(self):
registry = CollectorRegistry()
gauge = Gauge("gauge", "desc", registry=registry)

time.sleep(0.1)
metrics_output = generate_metrics(registry)
assert metrics_output == (
"# HELP gauge desc\n"
"# TYPE gauge gauge\n"
'gauge 0.0\n'
)

def test_gauge_labeled(self):
registry = CollectorRegistry()
gauge = Gauge("gauge", "desc", required_labels=["bob"], registry=registry)
gauge.labels(bob="cat").inc(2.7)

time.sleep(0.1)
metrics_output = generate_metrics(registry)
assert metrics_output == (
"# HELP gauge desc\n"
"# TYPE gauge gauge\n"
'gauge{bob="cat"} 2.7\n'
)

def test_metric_labeled_multiple(self):
registry = CollectorRegistry()
counter_labeled = Counter(
"counter_labeled", "desc", required_labels=["bob"], registry=registry
)
counter_labeled.labels(bob="cat").inc(2.7)
gauge = Gauge("gauge", "desc", required_labels=["bob"], registry=registry)
gauge.labels(bob="gage").inc(3.0)
gauge.labels(bob="blob").inc(3.2)
counter = Counter("counter", "desc", registry=registry)

time.sleep(0.1)
metrics_output = generate_metrics(registry)
assert metrics_output == (
"# HELP counter_labeled desc\n"
"# TYPE counter_labeled counter\n"
'counter_labeled{bob="cat"} 2.7\n'
"# HELP gauge desc\n"
"# TYPE gauge gauge\n"
'gauge{bob="blob"} 3.2\n'
'gauge{bob="gage"} 3.0\n'
"# HELP counter desc\n"
"# TYPE counter counter\n"
'counter 0.0\n'
)

def test_summary(self):
registry = CollectorRegistry()
summary = Summary("summary", "desc", registry=registry)

time.sleep(0.1)
metrics_output = generate_metrics(registry)
assert metrics_output == (
"# HELP summary desc\n"
"# TYPE summary summary\n"
'summary_count 0.0\n'
'summary_sum 0.0\n'
)

def test_summary_labeled(self):
registry = CollectorRegistry()
summary = Summary(
"summary",
"desc",
registry=registry,
required_labels=["bob"],
default_labels={"bob": "cat"},
)
summary.observe(7)

time.sleep(0.1)
metrics_output = generate_metrics(registry)
assert metrics_output == (
"# HELP summary desc\n"
"# TYPE summary summary\n"
'summary_count{bob="cat"} 1.0\n'
'summary_sum{bob="cat"} 7.0\n'
)

def test_histogram(self):
registry = CollectorRegistry()
histogram = Histogram("histogram", "desc", buckets=[1, 2, 3], registry=registry)
histogram.observe(2.7)
time.sleep(0.1)
metrics_output = generate_metrics(registry)
assert metrics_output == (
"# HELP histogram desc\n"
"# TYPE histogram histogram\n"
'histogram_bucket{le="1"} 0.0\n'
'histogram_bucket{le="2"} 0.0\n'
'histogram_bucket{le="3"} 1.0\n'
'histogram_bucket{le="+Inf"} 1.0\n'
'histogram_count 1.0\n'
'histogram_sum 2.7\n'
)

def test_histogram_labeled(self):
registry = CollectorRegistry()
histogram = Histogram(
"histogram", "desc", buckets=[1, 2, 3], required_labels=["bob"], registry=registry
)
histogram.labels(bob="cat").observe(2.7)

time.sleep(0.1)
metrics_output = generate_metrics(registry)
assert metrics_output == (
"# HELP histogram desc\n"
"# TYPE histogram histogram\n"
'histogram_bucket{bob="cat",le="1"} 0.0\n'
'histogram_bucket{bob="cat",le="2"} 0.0\n'
'histogram_bucket{bob="cat",le="3"} 1.0\n'
'histogram_bucket{bob="cat",le="+Inf"} 1.0\n'
'histogram_count{bob="cat"} 1.0\n'
'histogram_sum{bob="cat"} 2.7\n'
)

def test_labeled_histogram_is_ordered(self):
registry = CollectorRegistry()
histogram = Histogram(
"histogram", "desc", buckets=[1, 2, 3], required_labels=["bob"], registry=registry
)
histogram.labels(bob="cat")
histogram.labels(bob="bobby")
time.sleep(0.1) # give time to write to redis
metrics_output = generate_metrics(registry)
assert metrics_output == (
"# HELP histogram desc\n"
"# TYPE histogram histogram\n"
'histogram_bucket{bob="bobby",le="1"} 0.0\n'
'histogram_bucket{bob="bobby",le="2"} 0.0\n'
'histogram_bucket{bob="bobby",le="3"} 0.0\n'
'histogram_bucket{bob="bobby",le="+Inf"} 0.0\n'
'histogram_count{bob="bobby"} 0.0\n'
'histogram_sum{bob="bobby"} 0.0\n'
'histogram_bucket{bob="cat",le="1"} 0.0\n'
'histogram_bucket{bob="cat",le="2"} 0.0\n'
'histogram_bucket{bob="cat",le="3"} 0.0\n'
'histogram_bucket{bob="cat",le="+Inf"} 0.0\n'
'histogram_count{bob="cat"} 0.0\n'
'histogram_sum{bob="cat"} 0.0\n'
)

def test_labeled_summary_is_ordered(self):
registry = CollectorRegistry()
summary = Summary("summary", "desc", required_labels=["bob"], registry=registry)
summary.labels(bob="cat")
summary.labels(bob="bobby")
time.sleep(0.1) # give time to write to redis
metrics_output = generate_metrics(registry)
assert metrics_output == (
"# HELP summary desc\n"
"# TYPE summary summary\n"
'summary_count{bob="bobby"} 0.0\n'
'summary_sum{bob="bobby"} 0.0\n'
'summary_count{bob="cat"} 0.0\n'
'summary_sum{bob="cat"} 0.0\n'
)

def test_labeled_not_observable(self):
registry = CollectorRegistry()
Counter("counter", "desc", required_labels=["bob"], registry=registry)
Gauge("gauge", "desc", required_labels=["bob"], registry=registry)
Summary("summary", "desc", required_labels=["bob"], registry=registry)
Histogram("histogram", "desc", required_labels=["bob"], registry=registry)
metrics_output = generate_metrics(registry)
assert metrics_output == (
"# HELP counter desc\n"
"# TYPE counter counter\n"
"# HELP gauge desc\n"
"# TYPE gauge gauge\n"
"# HELP summary desc\n"
"# TYPE summary summary\n"
"# HELP histogram desc\n"
"# TYPE histogram histogram\n"
)

0 comments on commit 899f324

Please sign in to comment.