diff --git a/django_tests/conftest.py b/django_tests/conftest.py index 1c16dda..768931d 100644 --- a/django_tests/conftest.py +++ b/django_tests/conftest.py @@ -34,15 +34,19 @@ def pytest_configure(config): with suppress(Exception): print('Stop old container if exists') os.system('docker stop testpostgres') + os.system('docker stop testredis') time.sleep(1) with suppress(Exception): os.system('docker rm testpostgres') + os.system('docker rm testredis') try: print('Start docker') succ('docker run -d -p 40001:5432 -e POSTGRES_PASSWORD=password ' '--name=testpostgres postgres:13') + succ('docker run -d -p 40002:6379 --name=testredis redis:6.2.4') time.sleep(5) wait_socket('localhost', 40001, timeout=15) + wait_socket('localhost', 40002, timeout=15) create_database() except Exception as e: print('EXCEPTION: {}'.format(e)) @@ -53,7 +57,9 @@ def pytest_unconfigure(config): if config.getoption('docker_skip') or config.getoption('keep_db'): return os.system('docker stop -t 2 testpostgres') + os.system('docker stop -t 2 testredis') os.system('docker rm testpostgres') + os.system('docker rm testredis') @pytest.fixture(autouse=True) diff --git a/fan_tools/red_metrics.py b/fan_tools/red_metrics.py new file mode 100644 index 0000000..9b5acc5 --- /dev/null +++ b/fan_tools/red_metrics.py @@ -0,0 +1,215 @@ +import asyncio +import datetime +import functools +import logging +import time +from contextlib import contextmanager +from dataclasses import dataclass +from enum import Enum +from statistics import mean +from typing import List, Optional, Union + +import redis + +logger = logging.getLogger(__name__) + + +def extend_enum(inherited_enum): + def wrapper(added_enum): + joined = {} + for item in inherited_enum: + joined[item.name] = item.value + for item in added_enum: + joined[item.name] = item.value + return Enum(added_enum.__name__, joined) + + return wrapper + + +class SimpleMetrics(Enum): + requests_count = 'requests_count' + errors_count = 'errors_count' + duration = 'duration' + + @classmethod + def list(cls): + return list(map(lambda c: c.value, cls)) + + +@extend_enum(SimpleMetrics) +class Metrics(Enum): + duration_value = 'duration_value' + + +class MetricsStorage: + simple_metrics = SimpleMetrics.list() + + def incr(self, key): + raise NotImplementedError + + def set(self, key: str, value: Union[int, float]): + raise NotImplementedError + + def get(self, key: str): + raise NotImplementedError + + def sliding_window(self, key: str, value: Union[int, float]): + raise NotImplementedError + + def get_metrics(self, key: str, mean_count: int = 10) -> List[str]: + raise NotImplementedError + + +class RedisMetricsStorage(MetricsStorage): + def __init__(self, redis_url: str = None, expiration: int = None): + self.redis_url = redis_url + self.connection = self.connect() + self.expiration = expiration or 60 * 60 * 24 * 30 + + def connect(self) -> Optional[redis.Redis]: + if not self.redis_url: + return None + try: + r = redis.Redis().from_url(self.redis_url) + r.ping() + return r + except redis.connection.ConnectionError: + return None + + def get(self, key: str): + return self.connection.get(key) + + def incr(self, key: str): + self.connection.incr(key) + + def sliding_window(self, key: str, value: Union[int, float]): + ts = int(datetime.datetime.now().timestamp() * 1_000_000) + key = f'{key}:{ts}' + self.connection.set(key, value, ex=self.expiration) + + def set(self, key: str, value: Union[int, float]): + self.connection.set(key, value, ex=self.expiration) + + def _get_metric_keys(self, key: str, mean_count: int = 10): + keys = self.connection.scan_iter( + match=f'{key}:{Metrics.duration_value.value}:*', + count=1000, + ) + to_delete = [] + metrics_keys = [f'{key}:{metric}' for metric in self.simple_metrics] + metric_duration_keys = [] + for i in keys: + if f'{key}:{Metrics.duration_value.value}:'.encode() in i: + metric_duration_keys.append(i.decode()) + metric_duration_keys.sort() + metrics_keys.extend(metric_duration_keys[-mean_count:]) + to_delete.extend(metric_duration_keys[:-mean_count]) + return metrics_keys, to_delete + + def _fetch_metrics(self, metric_keys: List[str]): + metrics = self.connection.mget(metric_keys) + items = dict(zip(metric_keys, metrics)) + return items + + def _get_metrics(self, key: str, metrics: dict) -> List[str]: + result = [] + for metric in self.simple_metrics: + _key = f'{key}:{metric}' + val = metrics.pop(_key) + result.append(f'{key}_{metric} {val.decode() if val else 0}') + duration_key = f'{key}:{Metrics.duration_value.value}:' + values = [float(v) for i, v in metrics.items() if str(i).startswith(duration_key)] + value = mean(values) if values else 0 + result.append(f'{key}_duration_avg {value}') + return result + + def get_metrics(self, key: str, mean_count: int = 10) -> List[str]: + metric_keys, expired_keys = self._get_metric_keys(key, mean_count=mean_count) + if expired_keys: + self.connection.delete(*expired_keys) + metrics = self._fetch_metrics(metric_keys) + return self._get_metrics(key=key, metrics=metrics) + + +def red_metrics(name: str = None, storage: MetricsStorage = None): + """ + Decorator for serve RED (Read, Error, Duration) metrics. + Usage: + >>> redis_storage = RedisMetricsStorage('redis://localhost:6379/1') + ... @red_metrics(name='some_code_metrics', storage=redis_storage): + ... def some_code_to_profile() + """ + + def get_metric_name(func, name: str = None) -> str: + return name or f'{func.__module__}.{func.__name__}' + + @contextmanager + def prometheus_context(name: str): + start = time.time() + is_success = True + try: + yield + except: # noqa 722 + is_success = False + raise + finally: + if not storage: + return + execution_time = time.time() - start + if is_success: + storage.incr(f'{name}:{Metrics.requests_count.value}') + else: + storage.incr(f'{name}:{Metrics.errors_count.value}') + storage.sliding_window(f'{name}:{Metrics.duration_value.value}', execution_time) + storage.set(f'{name}:duration', execution_time) + + def wrapper(func): + metric_name = get_metric_name(func, name) + if not asyncio.iscoroutinefunction(func): + + @functools.wraps(func) + def wrapped(*args, **kwargs): + with prometheus_context(name=metric_name): + return func(*args, **kwargs) + + else: + + @functools.wraps(func) + async def wrapped(*args, **kwargs): + with prometheus_context(name=metric_name): + return await func(*args, **kwargs) + + return wrapped + + return wrapper + + +@dataclass +class ExporterMetricItem: + name: str + mean_count: int = 10 + + +class PrometheusExporter: + """ + Decorator for providing RED (Read, Error, Duration) metrics. + Usage: + >>> redis_storage = RedisMetricsStorage('redis://localhost:6379/1') + ... exporter = PrometheusExporter(storage=redis_storage) + ... exporter.metrics([ExporterMetricItem(name='some_code_metrics', mean_count=5)]) + """ + + def __init__(self, storage: MetricsStorage = None): + self.storage = storage + + def metrics(self, metrics: List[ExporterMetricItem] = None) -> List[str]: + if not self.storage or not metrics: + return [] + result = [] + for metric in metrics: + metric_result = self.storage.get_metrics( + key=metric.name, + mean_count=metric.mean_count, + ) + result.extend(metric_result) + return result diff --git a/setup.py b/setup.py index c8eedde..a1599f7 100644 --- a/setup.py +++ b/setup.py @@ -37,6 +37,7 @@ 'gitlab_monitoring': ['python-gitlab==1.0.2', SANIC], 'image_utils': ['Pillow'], 'otel': ['opentelemetry-instrumentation-django', 'opentelemetry-instrumentation-psycopg2'], + 'red_metrics': ['redis'], }, tests_require=['pytest==3.1.3'], entry_points={ diff --git a/tests/test_red_metrics.py b/tests/test_red_metrics.py new file mode 100644 index 0000000..9bede6a --- /dev/null +++ b/tests/test_red_metrics.py @@ -0,0 +1,148 @@ +import time + +import pytest + +from fan_tools.red_metrics import ( + ExporterMetricItem, + Metrics, + MetricsStorage, + PrometheusExporter, + RedisMetricsStorage, + red_metrics, +) + + +@pytest.fixture +def redis_url(): + return 'redis://localhost:40002/1' + + +@pytest.fixture +def redis_storage(redis_url): + return RedisMetricsStorage(redis_url=redis_url) + + +@pytest.fixture +def metrics_data(redis_storage): + def _create(key: str): + redis_storage.set(f'{key}:{Metrics.duration.value}', 0.5) + redis_storage.incr(f'{key}:{Metrics.requests_count.value}') + redis_storage.incr(f'{key}:{Metrics.errors_count.value}') + for i in range(50): + redis_storage.sliding_window(f'{key}:{Metrics.duration_value.value}', i) + + yield _create + + +class TestREDMetricsDecorator: + def test_success(self, redis_storage): + metric = red_metrics(name='test_metric', storage=redis_storage) + assert metric(lambda: 'baz')() == 'baz' + + def test_without_storage(self): + metric = red_metrics(name='test_metric') + assert metric(lambda: 'baz')() == 'baz' + + def test_not_implemented_storage(self): + metric = red_metrics(name='test_metric', storage=MetricsStorage()) + with pytest.raises(NotImplementedError): + metric(lambda: 'baz')() + + +class TestRedisMetricsStorage: + def test_invalid_redis_url(self): + with pytest.raises(ValueError) as exc: + RedisMetricsStorage(redis_url='invalid_redis_url') + assert ( + str(exc.value) + == 'Redis URL must specify one of the following schemes (redis://, rediss://, unix://)' + ) + + def test_incr(self, redis_storage): + key = 'incr_key' + redis_storage.incr(key) + assert int(redis_storage.get(key)) == 1 + + def test_set(self, redis_storage): + key = 'set_key' + redis_storage.set(key, 123) + assert int(redis_storage.get(key)) == 123 + + def test_sliding_window(self, redis_storage): + key = 'sliding_window_key' + for i in range(5): + redis_storage.sliding_window(key, i) + keys = redis_storage.connection.scan(cursor=0, match=f'{key}:*')[1] + metrics = redis_storage._fetch_metrics(keys) + metrics = [int(metric) for metric in metrics.values()] + assert set(metrics) == set(list(range(5))) + + def test_get_metrics(self, redis_storage, metrics_data): + key = 'metric' + metrics = redis_storage.get_metrics(key) + assert metrics == [ + f'{key}_requests_count 0', + f'{key}_errors_count 0', + f'{key}_duration 0', + f'{key}_duration_avg 0', + ] + + metrics_data(key) + metrics = redis_storage.get_metrics(key) + assert metrics == [ + f'{key}_requests_count 1', + f'{key}_errors_count 1', + f'{key}_duration 0.5', + f'{key}_duration_avg 44.5', + ] + metrics = redis_storage.get_metrics(key, mean_count=3) + assert metrics == [ + f'{key}_requests_count 1', + f'{key}_errors_count 1', + f'{key}_duration 0.5', + f'{key}_duration_avg 48.0', + ] + + +class TestPrometheusExporter: + def test_without_storage(self): + exporter = PrometheusExporter() + assert exporter.metrics() == [] + + def test_metrics(self, redis_storage, metrics_data): + metrics_data('simple_metric') + exporter = PrometheusExporter(storage=redis_storage) + assert exporter.metrics( + metrics=[ + ExporterMetricItem(name='simple_metric'), + ExporterMetricItem(name='not_exists_metric'), + ] + ) == [ + 'simple_metric_requests_count 1', + 'simple_metric_errors_count 1', + 'simple_metric_duration 0.5', + 'simple_metric_duration_avg 44.5', + 'not_exists_metric_requests_count 0', + 'not_exists_metric_errors_count 0', + 'not_exists_metric_duration 0', + 'not_exists_metric_duration_avg 0', + ] + + +def test_decorator_and_exporter(redis_storage): + metric = red_metrics(name='test', storage=redis_storage) + for _ in range(3): + metric(lambda: time.sleep(0.1))() + try: + metric(lambda: 1 / 0)() + except ZeroDivisionError: + pass + exporter = PrometheusExporter(storage=redis_storage) + metrics = exporter.metrics( + metrics=[ + ExporterMetricItem(name='test'), + ] + ) + assert len(metrics) == 4 + assert 'test_requests_count 3' in metrics + assert 'test_errors_count 1' in metrics diff --git a/tox.ini b/tox.ini index 18f5952..48faa19 100644 --- a/tox.ini +++ b/tox.ini @@ -17,6 +17,7 @@ deps = sanic==20.9.0 six starlette==0.13.8 + redis uvloop==0.14.0 django22: djangorestframework==3.9.* django22: Django==2.2.*