Skip to content

Commit 5bfd1f9

Browse files
authored
fix: enable prometheus multiprocessing feature (#357)
1 parent d4874d7 commit 5bfd1f9

File tree

12 files changed

+1030
-740
lines changed

12 files changed

+1030
-740
lines changed

bases/renku_data_services/data_api/main.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import sentry_sdk
99
import uvloop
10-
from prometheus_sanic import monitor
1110
from sanic import Sanic
1211
from sanic.log import logger
1312
from sanic.worker.loader import AppLoader
@@ -17,6 +16,7 @@
1716
from renku_data_services.app_config import Config
1817
from renku_data_services.authz.admin_sync import sync_admins_from_keycloak
1918
from renku_data_services.data_api.app import register_all_handlers
19+
from renku_data_services.data_api.prometheus import collect_system_metrics, setup_app_metrics, setup_prometheus
2020
from renku_data_services.errors.errors import (
2121
ForbiddenError,
2222
MissingResourceError,
@@ -31,11 +31,14 @@
3131
import sentry_sdk._types
3232

3333

34-
async def _send_messages() -> None:
34+
async def _send_messages(app: Sanic) -> None:
3535
config = Config.from_env()
3636
while True:
3737
try:
3838
await config.event_repo.send_pending_events()
39+
# we need to collect metrics for this background process separately from the task we add to the
40+
# server processes
41+
await collect_system_metrics(app, "send_events_worker")
3942
await asyncio.sleep(1)
4043
except (asyncio.CancelledError, KeyboardInterrupt) as e:
4144
logger.warning(f"Exiting: {e}")
@@ -45,14 +48,15 @@ async def _send_messages() -> None:
4548
raise
4649

4750

48-
def send_pending_events() -> None:
51+
def send_pending_events(app_name: str) -> None:
4952
"""Send pending messages in case sending in a handler failed."""
50-
_ = Sanic("send_events") # we need a dummy app for logging to work.
53+
app = Sanic(app_name)
54+
setup_app_metrics(app)
5155

5256
logger.info("running events sending loop.")
5357

5458
asyncio.set_event_loop(uvloop.new_event_loop())
55-
asyncio.run(_send_messages())
59+
asyncio.run(_send_messages(app))
5660

5761

5862
def create_app() -> Sanic:
@@ -104,9 +108,7 @@ async def setup_sentry(_: Sanic) -> None:
104108
logger.info(f"REAL_IP_HEADER = {app.config.REAL_IP_HEADER}")
105109

106110
app = register_all_handlers(app, config)
107-
108-
# Setup prometheus
109-
monitor(app, endpoint_type="url", multiprocess_mode="all", is_middleware=True).expose_endpoint()
111+
setup_prometheus(app)
110112

111113
if environ.get("CORS_ALLOW_ALL_ORIGINS", "false").lower() == "true":
112114
from sanic_ext import Extend
@@ -134,7 +136,7 @@ async def setup_rclone_validator(app: Sanic) -> None:
134136
async def ready(app: Sanic) -> None:
135137
"""Application ready event handler."""
136138
logger.info("starting events background job.")
137-
app.manager.manage("SendEvents", send_pending_events, {}, transient=True)
139+
app.manager.manage("SendEvents", send_pending_events, {"app_name": config.app_name}, transient=True)
138140

139141
return app
140142

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
"""Prometheus Metrics."""
2+
3+
import asyncio
4+
import resource
5+
6+
import aiofiles
7+
from prometheus_client import Gauge
8+
from prometheus_sanic import monitor
9+
from prometheus_sanic.constants import BaseMetrics
10+
from prometheus_sanic.metrics import init
11+
from sanic import Sanic
12+
13+
_PAGESIZE = resource.getpagesize()
14+
PROMETHEUS_VIRTUAL_MEMORY = "sanic_process_virtual_memory_bytes"
15+
PROMETHEUS_RESIDENT_MEMORY = "sanic_process_resident_memory_bytes"
16+
PROMETHEUS_METRICS_LIST = [
17+
(
18+
PROMETHEUS_VIRTUAL_MEMORY,
19+
Gauge(PROMETHEUS_VIRTUAL_MEMORY, "Virtual memory size in bytes.", ["worker"]),
20+
),
21+
(
22+
PROMETHEUS_RESIDENT_MEMORY,
23+
Gauge(PROMETHEUS_RESIDENT_MEMORY, "Resident memory size in bytes.", ["worker"]),
24+
),
25+
]
26+
27+
28+
async def collect_system_metrics(app: Sanic, name: str) -> None:
29+
"""Collect prometheus system metrics in a background task.
30+
31+
This is similar to the official prometheus_client implementation, which doesn't support CPU/Mem metrics
32+
in multiprocess mode
33+
"""
34+
try:
35+
async with aiofiles.open("/proc/self/stat", "rb") as stat:
36+
content = await stat.read()
37+
parts = content.split(b")")[-1].split()
38+
app.ctx.metrics[PROMETHEUS_VIRTUAL_MEMORY].labels({name}).set(float(parts[20]))
39+
app.ctx.metrics[PROMETHEUS_RESIDENT_MEMORY].labels({name}).set(float(parts[21]) * _PAGESIZE)
40+
except OSError:
41+
pass
42+
43+
44+
async def collect_system_metrics_task(app: Sanic) -> None:
45+
"""Background task to collect metrics."""
46+
while True:
47+
await collect_system_metrics(app, app.m.name)
48+
await asyncio.sleep(5)
49+
50+
51+
def setup_prometheus(app: Sanic) -> None:
52+
"""Setup prometheus monitoring.
53+
54+
We add custom metrics collection wo sanic workers and to the send_messages background job, since
55+
prometheus does not collect cpy/memory metrics when in multiprocess mode.
56+
"""
57+
app.add_task(collect_system_metrics_task) # type:ignore[arg-type]
58+
monitor(
59+
app,
60+
endpoint_type="url",
61+
multiprocess_mode="all",
62+
is_middleware=True,
63+
metrics_list=PROMETHEUS_METRICS_LIST,
64+
).expose_endpoint()
65+
66+
67+
def setup_app_metrics(app: Sanic) -> None:
68+
"""Setup metrics for a Sanic app.
69+
70+
NOTE: this should only be called for manually created workers (with app.manager.manage(...))
71+
"""
72+
app.ctx.metrics = {}
73+
init(app, metrics_list=PROMETHEUS_METRICS_LIST, metrics=BaseMetrics)

0 commit comments

Comments
 (0)