diff --git a/requirements.txt b/requirements.txt index 645e6612d..f9ec89a82 100644 --- a/requirements.txt +++ b/requirements.txt @@ -180,6 +180,8 @@ toml==0.10.2 # via teuthology (pyproject.toml) tox==4.11.4 # via teuthology (pyproject.toml) +types-psutil==6.0.0.20240621 + # via teuthology (pyproject.toml) urllib3==1.26.18 # via # botocore diff --git a/scripts/test/test_exporter_.py b/scripts/test/test_exporter_.py new file mode 100644 index 000000000..b0611a337 --- /dev/null +++ b/scripts/test/test_exporter_.py @@ -0,0 +1,5 @@ +from script import Script + + +class TestExporter(Script): + script_name = 'teuthology-exporter' diff --git a/setup.cfg b/setup.cfg index 73b006030..63b61106a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -56,6 +56,7 @@ install_requires = python-dateutil requests>2.13.0 sentry-sdk + types-psutil urllib3>=1.25.4,<1.27 # For botocore scripts = teuthology/task/install/bin/adjust-ulimits diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index e97d17513..15c4003c9 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -313,7 +313,10 @@ def lock_machines(job_config): fake_ctx = supervisor.create_fake_context(job_config, block=True) machine_type = job_config["machine_type"] count = len(job_config['roles']) - with exporter.NodeLockingTime.labels(machine_type, count).time(): + with exporter.NodeLockingTime().time( + machine_type=machine_type, + count=count, + ): lock_ops.block_and_lock_machines( fake_ctx, count, diff --git a/teuthology/dispatcher/supervisor.py b/teuthology/dispatcher/supervisor.py index 2eb52f663..d2e86de36 100644 --- a/teuthology/dispatcher/supervisor.py +++ b/teuthology/dispatcher/supervisor.py @@ -44,10 +44,10 @@ def main(args): # If a job (e.g. from the nop suite) doesn't need nodes, avoid # submitting a zero here. if node_count: - with exporter.NodeReimagingTime.labels( - job_config["machine_type"], - node_count - ).time(): + with exporter.NodeReimagingTime().time( + machine_type=job_config["machine_type"], + node_count=node_count, + ): reimage(job_config) else: reimage(job_config) @@ -57,7 +57,7 @@ def main(args): try: suite = job_config.get("suite") if suite: - with exporter.JobTime.labels(suite).time(): + with exporter.JobTime().time(suite=suite): return run_job( job_config, args.bin_path, diff --git a/teuthology/exporter.py b/teuthology/exporter.py index 017908f62..30aead875 100644 --- a/teuthology/exporter.py +++ b/teuthology/exporter.py @@ -1,3 +1,4 @@ +import contextlib import itertools import logging import os @@ -15,7 +16,6 @@ PROMETHEUS_MULTIPROC_DIR = Path("~/.cache/teuthology-exporter").expanduser() -PROMETHEUS_MULTIPROC_DIR.mkdir(parents=True, exist_ok=True) os.environ["PROMETHEUS_MULTIPROC_DIR"] = str(PROMETHEUS_MULTIPROC_DIR) # We can't import prometheus_client until after we set PROMETHEUS_MULTIPROC_DIR @@ -28,18 +28,17 @@ CollectorRegistry, ) -registry = CollectorRegistry() -multiprocess.MultiProcessCollector(registry) - MACHINE_TYPES = list(config.active_machine_types) +REGISTRY = None class TeuthologyExporter: port = 61764 # int(''.join([str((ord(c) - 100) % 10) for c in "teuth"])) def __init__(self, interval=60): - for file in PROMETHEUS_MULTIPROC_DIR.iterdir(): - file.unlink() + if REGISTRY: + for file in PROMETHEUS_MULTIPROC_DIR.iterdir(): + file.unlink() self.interval = interval self.metrics = [ Dispatchers(), @@ -50,7 +49,8 @@ def __init__(self, interval=60): self._created_time = time.perf_counter() def start(self): - start_http_server(self.port, registry=registry) + if REGISTRY: + start_http_server(self.port, registry=REGISTRY) self.loop() def update(self): @@ -85,24 +85,61 @@ def loop(self): def restart(self): # Use the dispatcher's restart function - note that by using this here, # it restarts the exporter, *not* the dispatcher. - return teuthology.dispatcher.restart(log=log) + if REGISTRY: + return teuthology.dispatcher.restart(log=log) + + +class SingletonMeta(type): + _instances = {} + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + instance = super().__call__(*args, **kwargs) + cls._instances[cls] = instance + return cls._instances[cls] -class TeuthologyMetric: +class TeuthologyMetric(metaclass=SingletonMeta): def __init__(self): - pass + if REGISTRY: + self._init() + + def _init(self): + raise NotImplementedError def update(self): + if REGISTRY: + self._update() + + def _update(self): + raise NotImplementedError + + def record(self, **kwargs): + if REGISTRY: + self._record(**kwargs) + + def _record(self, **_): + raise NotImplementedError + + @contextlib.contextmanager + def time(self, **labels): + if REGISTRY: + yield self._time(**labels) + else: + yield + + def _time(self): raise NotImplementedError class Dispatchers(TeuthologyMetric): - def __init__(self): + def _init(self): self.metric = Gauge( - "teuthology_dispatchers", "Teuthology Dispatchers", ["machine_type"] + "teuthology_dispatchers", + "Teuthology Dispatchers", + ["machine_type"], ) - def update(self): + def _update(self): dispatcher_procs = teuthology.dispatcher.find_dispatcher_processes() for machine_type in MACHINE_TYPES: self.metric.labels(machine_type).set( @@ -111,15 +148,17 @@ def update(self): class BeanstalkQueue(TeuthologyMetric): - def __init__(self): + def _init(self): self.length = Gauge( - "beanstalk_queue_length", "Beanstalk Queue Length", ["machine_type"] + "beanstalk_queue_length", + "Beanstalk Queue Length", + ["machine_type"], ) self.paused = Gauge( "beanstalk_queue_paused", "Beanstalk Queue is Paused", ["machine_type"] ) - def update(self): + def _update(self): for machine_type in MACHINE_TYPES: queue_stats = beanstalk.stats_tube(beanstalk.connect(), machine_type) self.length.labels(machine_type).set(queue_stats["count"]) @@ -127,14 +166,13 @@ def update(self): class JobProcesses(TeuthologyMetric): - def __init__(self): + def _init(self): self.metric = Gauge( "teuthology_job_processes", "Teuthology Job Processes", ) - def update(self): - + def _update(self): attrs = ["pid", "cmdline"] total = 0 for proc in psutil.process_iter(attrs=attrs): @@ -148,6 +186,8 @@ def _match(proc): cmdline = proc.cmdline() except psutil.ZombieProcess: return False + except psutil.AccessDenied: + return False if not len(cmdline) > 1: return False if not cmdline[1].endswith("teuthology"): @@ -166,12 +206,14 @@ def _match(proc): class Nodes(TeuthologyMetric): - def __init__(self): + def _init(self): self.metric = Gauge( - "teuthology_nodes", "Teuthology Nodes", ["machine_type", "locked", "up"] + "teuthology_nodes", + "Teuthology Nodes", + ["machine_type", "locked", "up"], ) - def update(self): + def _update(self): for machine_type in MACHINE_TYPES: nodes = list_locks(machine_type=machine_type) for up, locked in itertools.product([True, False], [True, False]): @@ -180,8 +222,8 @@ def update(self): ) -class _JobResults(TeuthologyMetric): - def __init__(self): +class JobResults(TeuthologyMetric): + def _init(self): self.metric = Counter( "teuthology_job_results", "Teuthology Job Results", @@ -189,15 +231,12 @@ def __init__(self): ) # As this is to be used within job processes, we implement record() rather than update() - def record(self, machine_type, status): - self.metric.labels(machine_type=machine_type, status=status).inc() + def _record(self, **labels): + self.metric.labels(**labels).inc() -JobResults = _JobResults() - - -class _NodeReimagingResults(TeuthologyMetric): - def __init__(self): +class NodeReimagingResults(TeuthologyMetric): + def _init(self): self.metric = Counter( "teuthology_reimaging_results", "Teuthology Reimaging Results", @@ -205,42 +244,104 @@ def __init__(self): ) # As this is to be used within job processes, we implement record() rather than update() - def record(self, machine_type, status): - self.metric.labels(machine_type=machine_type, status=status).inc() + def _record(self, **labels): + if REGISTRY: + self.metric.labels(**labels).inc() -NodeReimagingResults = _NodeReimagingResults() +class NodeLockingTime(TeuthologyMetric): + def _init(self): + self.metric = Summary( + "teuthology_node_locking_duration_seconds", + "Time spent waiting to lock nodes", + ["machine_type", "count"], + ) -NodeLockingTime = Summary( - "teuthology_node_locking_duration_seconds", - "Time spent waiting to lock nodes", - ["machine_type", "count"], -) + def _time(self, **labels): + yield self.metric.labels(**labels).time() -NodeReimagingTime = Summary( - "teuthology_node_reimaging_duration_seconds", - "Time spent reimaging nodes", - ["machine_type", "count"], -) -JobTime = Summary( - "teuthology_job_duration_seconds", - "Time spent executing a job", - ["suite"], -) +class NodeReimagingTime(TeuthologyMetric): + def _init(self): + self.metric = Summary( + "teuthology_node_reimaging_duration_seconds", + "Time spent reimaging nodes", + ["machine_type", "count"], + ) -TaskTime = Summary( - "teuthology_task_duration_seconds", - "Time spent executing a task", - ["name", "phase"], -) + def _time(self, **labels): + yield self.metric.labels(**labels).time() -BootstrapTime = Summary( - "teuthology_bootstrap_duration_seconds", - "Time spent running teuthology's bootstrap script", -) + +class JobTime(TeuthologyMetric): + def _init(self): + self.metric = Summary( + "teuthology_job_duration_seconds", + "Time spent executing a job", + ["suite"], + ) + + def _time(self, **labels): + yield self.metric.labels(**labels).time() + + +class TaskTime(TeuthologyMetric): + def _init(self): + self.metric = Summary( + "teuthology_task_duration_seconds", + "Time spent executing a task", + ["name", "phase"], + ) + + def _time(self, **labels): + yield self.metric.labels(**labels).time() -def main(args): +class BootstrapTime(TeuthologyMetric): + def _init(self): + self.metric = Summary( + "teuthology_bootstrap_duration_seconds", + "Time spent running teuthology's bootstrap script", + ) + + def _time(self, **labels): + yield self.metric.labels(**labels).time() + + +def find_exporter_process() -> int | None: + attrs = ['pid', 'uids', 'cmdline'] + for proc in psutil.process_iter(attrs=attrs): + try: + cmdline = proc.info['cmdline'] + except psutil.AccessDenied: + continue + pid = proc.info['pid'] + if not cmdline: + continue + if not [i for i in cmdline if i.split('/')[-1] == 'teuthology-exporter']: + continue + if os.getuid() not in proc.info['uids']: + continue + return pid + + +def main(args) -> int: + if pid := find_exporter_process(): + if os.getpid() != pid: + log.error(f"teuthology-exporter is already running as PID {pid}") + return 2 exporter = TeuthologyExporter(interval=int(args["--interval"])) - exporter.start() + try: + exporter.start() + except Exception: + log.exception("Exporter failed") + return 1 + else: + return 0 + + +pid = find_exporter_process() +if pid: + PROMETHEUS_MULTIPROC_DIR.mkdir(parents=True, exist_ok=True) + REGISTRY = CollectorRegistry() + multiprocess.MultiProcessCollector(REGISTRY) diff --git a/teuthology/kill.py b/teuthology/kill.py index a18e0ac8a..137e49080 100755 --- a/teuthology/kill.py +++ b/teuthology/kill.py @@ -95,9 +95,9 @@ def kill_job(run_name, job_id, archive_base=None, owner=None, skip_unlock=False) return report.try_push_job_info(job_info, dict(status="dead")) if 'machine_type' in job_info: - teuthology.exporter.JobResults.record( - job_info["machine_type"], - job_info.get("status", "dead") + teuthology.exporter.JobResults().record( + machine_type=job_info["machine_type"], + status=job_info.get("status", "dead") ) else: log.warn(f"Job {job_id} has no machine_type; cannot report via Prometheus") diff --git a/teuthology/provision/__init__.py b/teuthology/provision/__init__.py index 5afc5ed7e..48392eaba 100644 --- a/teuthology/provision/__init__.py +++ b/teuthology/provision/__init__.py @@ -48,9 +48,9 @@ def reimage(ctx, machine_name, machine_type): # or SystemExit raise finally: - teuthology.exporter.NodeReimagingResults.record( - machine_type, - status, + teuthology.exporter.NodeReimagingResults().record( + machine_type=machine_type, + status=status, ) return result diff --git a/teuthology/repo_utils.py b/teuthology/repo_utils.py index 6ab0747ad..79fd92eda 100644 --- a/teuthology/repo_utils.py +++ b/teuthology/repo_utils.py @@ -440,7 +440,7 @@ def fetch_teuthology(branch, commit=None, lock=True): def bootstrap_teuthology(dest_path): - with exporter.BootstrapTime.time(): + with exporter.BootstrapTime().time(): log.info("Bootstrapping %s", dest_path) # This magic makes the bootstrap script not attempt to clobber an # existing virtualenv. But the branch's bootstrap needs to actually diff --git a/teuthology/report.py b/teuthology/report.py index e5382133b..f0a447201 100644 --- a/teuthology/report.py +++ b/teuthology/report.py @@ -474,7 +474,10 @@ def push_job_info(run_name, job_id, job_info, base_uri=None): reporter.report_job(run_name, job_id, job_info) status = get_status(job_info) if status in ["pass", "fail", "dead"] and "machine_type" in job_info: - teuthology.exporter.JobResults.record(job_info["machine_type"], status) + teuthology.exporter.JobResults().record( + machine_type=job_info["machine_type"], + status=status, + ) def try_push_job_info(job_config, extra_info=None): @@ -584,7 +587,10 @@ def try_mark_run_dead(run_name): log.info("Marking job {job_id} as dead".format(job_id=job_id)) reporter.report_job(run_name, job['job_id'], dead=True) if "machine_type" in job: - teuthology.exporter.JobResults.record(job["machine_type"], job["status"]) + teuthology.exporter.JobResults().record( + machine_type=job["machine_type"], + status=job["status"], + ) except report_exceptions: log.exception("Could not mark job as dead: {job_id}".format( job_id=job_id)) diff --git a/teuthology/run_tasks.py b/teuthology/run_tasks.py index 1af2c6820..267d8fd3f 100644 --- a/teuthology/run_tasks.py +++ b/teuthology/run_tasks.py @@ -105,7 +105,10 @@ def run_tasks(tasks, ctx): manager = run_one_task(taskname, ctx=ctx, config=config) if hasattr(manager, '__enter__'): stack.append((taskname, manager)) - with exporter.TaskTime.labels(taskname, "enter").time(): + with exporter.TaskTime().time( + name=taskname, + phase="enter" + ): manager.__enter__() except BaseException as e: if isinstance(e, ConnectionLostError): @@ -150,7 +153,10 @@ def run_tasks(tasks, ctx): log.debug('Unwinding manager %s', taskname) timer.mark('%s exit' % taskname) try: - with exporter.TaskTime.labels(taskname, "exit").time(): + with exporter.TaskTime().time( + name=taskname, + phase="exit" + ): suppress = manager.__exit__(*exc_info) except Exception as e: if isinstance(e, ConnectionLostError): diff --git a/teuthology/test/test_imports.py b/teuthology/test/test_imports.py index b73ade15e..cb25c8d0a 100644 --- a/teuthology/test/test_imports.py +++ b/teuthology/test/test_imports.py @@ -1,7 +1,8 @@ +import importlib import pytest +import sys from pathlib import Path -from subprocess import check_call from typing import List root = Path("./teuthology") @@ -26,4 +27,5 @@ def find_modules() -> List[str]: @pytest.mark.parametrize("module", find_modules()) def test_import_modules(module): - check_call(["python3", "-c", f"import {module}"]) + importlib.import_module(module) + assert module in sys.modules