Skip to content

Commit

Permalink
exporter: With no exporter running, metrics collection should no-op
Browse files Browse the repository at this point in the history
The new import tests exposed a ValueError raised by prometheus_client regarding
duplicate metrics in the multiprocess directory.

Signed-off-by: Zack Cerza <[email protected]>
  • Loading branch information
zmc committed Jul 26, 2024
1 parent f453952 commit 4f7b8ca
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 77 deletions.
5 changes: 4 additions & 1 deletion teuthology/dispatcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,10 @@ def lock_machines(job_config):
fake_ctx = supervisor.create_fake_context(job_config, block=True)
machine_type = job_config["machine_type"]
count = len(job_config['roles'])
with exporter.NodeLockingTime.labels(machine_type, count).time():
with exporter.NodeLockingTime().time(
machine_type=machine_type,
count=count,
):
lock_ops.block_and_lock_machines(
fake_ctx,
count,
Expand Down
10 changes: 5 additions & 5 deletions teuthology/dispatcher/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ def main(args):
# If a job (e.g. from the nop suite) doesn't need nodes, avoid
# submitting a zero here.
if node_count:
with exporter.NodeReimagingTime.labels(
job_config["machine_type"],
node_count
).time():
with exporter.NodeReimagingTime().time(
machine_type=job_config["machine_type"],
node_count=node_count,
):
reimage(job_config)
else:
reimage(job_config)
Expand All @@ -57,7 +57,7 @@ def main(args):
try:
suite = job_config.get("suite")
if suite:
with exporter.JobTime.labels(suite).time():
with exporter.JobTime().time(suite=suite):
return run_job(
job_config,
args.bin_path,
Expand Down
219 changes: 159 additions & 60 deletions teuthology/exporter.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import itertools
import logging
import os
Expand All @@ -15,7 +16,6 @@


PROMETHEUS_MULTIPROC_DIR = Path("~/.cache/teuthology-exporter").expanduser()
PROMETHEUS_MULTIPROC_DIR.mkdir(parents=True, exist_ok=True)
os.environ["PROMETHEUS_MULTIPROC_DIR"] = str(PROMETHEUS_MULTIPROC_DIR)

# We can't import prometheus_client until after we set PROMETHEUS_MULTIPROC_DIR
Expand All @@ -28,18 +28,17 @@
CollectorRegistry,
)

registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)

MACHINE_TYPES = list(config.active_machine_types)
REGISTRY = None


class TeuthologyExporter:
port = 61764 # int(''.join([str((ord(c) - 100) % 10) for c in "teuth"]))

def __init__(self, interval=60):
for file in PROMETHEUS_MULTIPROC_DIR.iterdir():
file.unlink()
if REGISTRY:
for file in PROMETHEUS_MULTIPROC_DIR.iterdir():
file.unlink()
self.interval = interval
self.metrics = [
Dispatchers(),
Expand All @@ -50,7 +49,8 @@ def __init__(self, interval=60):
self._created_time = time.perf_counter()

def start(self):
start_http_server(self.port, registry=registry)
if REGISTRY:
start_http_server(self.port, registry=REGISTRY)
self.loop()

def update(self):
Expand Down Expand Up @@ -85,24 +85,61 @@ def loop(self):
def restart(self):
# Use the dispatcher's restart function - note that by using this here,
# it restarts the exporter, *not* the dispatcher.
return teuthology.dispatcher.restart(log=log)
if REGISTRY:
return teuthology.dispatcher.restart(log=log)


class SingletonMeta(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
instance = super().__call__(*args, **kwargs)
cls._instances[cls] = instance
return cls._instances[cls]


class TeuthologyMetric:
class TeuthologyMetric(metaclass=SingletonMeta):
def __init__(self):
pass
if REGISTRY:
self._init()

def _init(self):
raise NotImplementedError

def update(self):
if REGISTRY:
self._update()

def _update(self):
raise NotImplementedError

def record(self, **kwargs):
if REGISTRY:
self._record(**kwargs)

def _record(self, **_):
raise NotImplementedError

@contextlib.contextmanager
def time(self, **labels):
if REGISTRY:
yield self._time(**labels)
else:
yield

def _time(self):
raise NotImplementedError


class Dispatchers(TeuthologyMetric):
def __init__(self):
def _init(self):
self.metric = Gauge(
"teuthology_dispatchers", "Teuthology Dispatchers", ["machine_type"]
"teuthology_dispatchers",
"Teuthology Dispatchers",
["machine_type"],
)

def update(self):
def _update(self):
dispatcher_procs = teuthology.dispatcher.find_dispatcher_processes()
for machine_type in MACHINE_TYPES:
self.metric.labels(machine_type).set(
Expand All @@ -111,30 +148,31 @@ def update(self):


class BeanstalkQueue(TeuthologyMetric):
def __init__(self):
def _init(self):
self.length = Gauge(
"beanstalk_queue_length", "Beanstalk Queue Length", ["machine_type"]
"beanstalk_queue_length",
"Beanstalk Queue Length",
["machine_type"],
)
self.paused = Gauge(
"beanstalk_queue_paused", "Beanstalk Queue is Paused", ["machine_type"]
)

def update(self):
def _update(self):
for machine_type in MACHINE_TYPES:
queue_stats = beanstalk.stats_tube(beanstalk.connect(), machine_type)
self.length.labels(machine_type).set(queue_stats["count"])
self.paused.labels(machine_type).set(1 if queue_stats["paused"] else 0)


class JobProcesses(TeuthologyMetric):
def __init__(self):
def _init(self):
self.metric = Gauge(
"teuthology_job_processes",
"Teuthology Job Processes",
)

def update(self):

def _update(self):
attrs = ["pid", "cmdline"]
total = 0
for proc in psutil.process_iter(attrs=attrs):
Expand Down Expand Up @@ -169,12 +207,14 @@ def _match(proc):


class Nodes(TeuthologyMetric):
def __init__(self):
def _init(self):
self.metric = Gauge(
"teuthology_nodes", "Teuthology Nodes", ["machine_type", "locked", "up"]
"teuthology_nodes",
"Teuthology Nodes",
["machine_type", "locked", "up"],
)

def update(self):
def _update(self):
for machine_type in MACHINE_TYPES:
nodes = list_locks(machine_type=machine_type)
for up, locked in itertools.product([True, False], [True, False]):
Expand All @@ -183,67 +223,126 @@ def update(self):
)


class _JobResults(TeuthologyMetric):
def __init__(self):
class JobResults(TeuthologyMetric):
def _init(self):
self.metric = Counter(
"teuthology_job_results",
"Teuthology Job Results",
["machine_type", "status"],
)

# As this is to be used within job processes, we implement record() rather than update()
def record(self, machine_type, status):
self.metric.labels(machine_type=machine_type, status=status).inc()
def _record(self, **labels):
self.metric.labels(**labels).inc()


JobResults = _JobResults()


class _NodeReimagingResults(TeuthologyMetric):
def __init__(self):
class NodeReimagingResults(TeuthologyMetric):
def _init(self):
self.metric = Counter(
"teuthology_reimaging_results",
"Teuthology Reimaging Results",
["machine_type", "status"],
)

# As this is to be used within job processes, we implement record() rather than update()
def record(self, machine_type, status):
self.metric.labels(machine_type=machine_type, status=status).inc()
def _record(self, **labels):
if REGISTRY:
self.metric.labels(**labels).inc()


NodeReimagingResults = _NodeReimagingResults()
class NodeLockingTime(TeuthologyMetric):
def _init(self):
self.metric = Summary(
"teuthology_node_locking_duration_seconds",
"Time spent waiting to lock nodes",
["machine_type", "count"],
)

NodeLockingTime = Summary(
"teuthology_node_locking_duration_seconds",
"Time spent waiting to lock nodes",
["machine_type", "count"],
)
def _time(self, **labels):
yield self.metric.labels(**labels).time()

NodeReimagingTime = Summary(
"teuthology_node_reimaging_duration_seconds",
"Time spent reimaging nodes",
["machine_type", "count"],
)

JobTime = Summary(
"teuthology_job_duration_seconds",
"Time spent executing a job",
["suite"],
)
class NodeReimagingTime(TeuthologyMetric):
def _init(self):
self.metric = Summary(
"teuthology_node_reimaging_duration_seconds",
"Time spent reimaging nodes",
["machine_type", "count"],
)

TaskTime = Summary(
"teuthology_task_duration_seconds",
"Time spent executing a task",
["name", "phase"],
)
def _time(self, **labels):
yield self.metric.labels(**labels).time()

BootstrapTime = Summary(
"teuthology_bootstrap_duration_seconds",
"Time spent running teuthology's bootstrap script",
)

class JobTime(TeuthologyMetric):
def _init(self):
self.metric = Summary(
"teuthology_job_duration_seconds",
"Time spent executing a job",
["suite"],
)

def _time(self, **labels):
yield self.metric.labels(**labels).time()


class TaskTime(TeuthologyMetric):
def _init(self):
self.metric = Summary(
"teuthology_task_duration_seconds",
"Time spent executing a task",
["name", "phase"],
)

def _time(self, **labels):
yield self.metric.labels(**labels).time()


def main(args):
class BootstrapTime(TeuthologyMetric):
def _init(self):
self.metric = Summary(
"teuthology_bootstrap_duration_seconds",
"Time spent running teuthology's bootstrap script",
)

def _time(self, **labels):
yield self.metric.labels(**labels).time()


def find_exporter_process() -> int | None:
attrs = ['pid', 'uids', 'cmdline']
for proc in psutil.process_iter(attrs=attrs):
try:
cmdline = proc.info['cmdline']
except psutil.AccessDenied:
continue
pid = proc.info['pid']
if not cmdline:
continue
if not [i for i in cmdline if i.split('/')[-1] == 'teuthology-exporter']:
continue
if os.getuid() not in proc.info['uids']:
continue
return pid


def main(args) -> int:
if pid := find_exporter_process():
if os.getpid() != pid:
log.error(f"teuthology-exporter is already running as PID {pid}")
return 2
exporter = TeuthologyExporter(interval=int(args["--interval"]))
exporter.start()
try:
exporter.start()
except Exception:
log.exception("Exporter failed")
return 1
else:
return 0


pid = find_exporter_process()
if pid:
PROMETHEUS_MULTIPROC_DIR.mkdir(parents=True, exist_ok=True)
REGISTRY = CollectorRegistry()
multiprocess.MultiProcessCollector(REGISTRY)
6 changes: 3 additions & 3 deletions teuthology/kill.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ def kill_job(run_name, job_id, archive_base=None, owner=None, skip_unlock=False)
return
report.try_push_job_info(job_info, dict(status="dead"))
if 'machine_type' in job_info:
teuthology.exporter.JobResults.record(
job_info["machine_type"],
job_info.get("status", "dead")
teuthology.exporter.JobResults().record(
machine_type=job_info["machine_type"],
status=job_info.get("status", "dead")
)
else:
log.warn(f"Job {job_id} has no machine_type; cannot report via Prometheus")
Expand Down
6 changes: 3 additions & 3 deletions teuthology/provision/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ def reimage(ctx, machine_name, machine_type):
# or SystemExit
raise
finally:
teuthology.exporter.NodeReimagingResults.record(
machine_type,
status,
teuthology.exporter.NodeReimagingResults().record(
machine_type=machine_type,
status=status,
)
return result

Expand Down
Loading

0 comments on commit 4f7b8ca

Please sign in to comment.