diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index 59f8ae3279..cb56c343b6 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -13,7 +13,6 @@ setup_log_file, install_except_hook, # modules - beanstalk, exporter, report, repo_utils, @@ -21,9 +20,12 @@ from teuthology.config import config as teuth_config from teuthology.dispatcher import supervisor from teuthology.exceptions import BranchNotFoundError, CommitNotFoundError, SkipJob, MaxWhileTries -from teuthology.lock import ops as lock_ops from teuthology.util.time import parse_timestamp from teuthology import safepath +from teuthology.jobqueue.base import QueueDirection +import teuthology.machines +import teuthology.jobqueue.choice + log = logging.getLogger(__name__) start_time = datetime.datetime.now(datetime.timezone.utc) @@ -88,8 +90,7 @@ def main(args): load_config(archive_dir=archive_dir) - connection = beanstalk.connect() - beanstalk.watch_tube(connection, args.tube) + queue = teuthology.jobqueue.choice.from_config(args.tube, QueueDirection.OUT) result_proc = None if teuth_config.teuthology_path is None: @@ -118,7 +119,7 @@ def main(args): if rc is not None: worst_returncode = max([worst_returncode, rc]) job_procs.remove(proc) - job = connection.reserve(timeout=60) + job = queue.get() if job is None: if args.exit_on_empty_queue and not job_procs: log.info("Queue is empty and no supervisor processes running; exiting!") @@ -129,8 +130,8 @@ def main(args): job.bury() job_id = job.jid log.info('Reserved job %d', job_id) - log.info('Config is: %s', job.body) - job_config = yaml.safe_load(job.body) + job_config = job.job_config() + log.info('Config is: %s', job_config) job_config['job_id'] = str(job_id) if job_config.get('stop_worker'): @@ -182,7 +183,7 @@ def main(args): log.exception(error_message) if 'targets' in job_config: node_names = job_config["targets"].keys() - lock_ops.unlock_safe( + teuthology.machines.unlock_safe( node_names, job_config["owner"], job_config["name"], @@ -345,7 +346,7 @@ def lock_machines(job_config): machine_type=machine_type, count=count, ): - lock_ops.block_and_lock_machines( + teuthology.machines.must_reserve( fake_ctx, count, machine_type, diff --git a/teuthology/dispatcher/supervisor.py b/teuthology/dispatcher/supervisor.py index b89c39ac5a..5b8291cdaf 100644 --- a/teuthology/dispatcher/supervisor.py +++ b/teuthology/dispatcher/supervisor.py @@ -14,11 +14,10 @@ from teuthology import setup_log_file, install_except_hook from teuthology.misc import get_user, archive_logs, compress_logs from teuthology.config import FakeNamespace -from teuthology.lock import ops as lock_ops from teuthology.task import internal from teuthology.misc import decanonicalize_hostname as shortname -from teuthology.lock import query from teuthology.util import sentry +import teuthology.machines log = logging.getLogger(__name__) @@ -205,7 +204,7 @@ def check_for_reimage_failures_and_mark_down(targets, count=10): continue # Mark machine down machine_name = shortname(k) - lock_ops.update_lock( + teuthology.machines.update_lock( machine_name, description='reimage failed {0} times'.format(count), status='down', @@ -223,7 +222,7 @@ def reimage(job_config): report.try_push_job_info(ctx.config, dict(status='waiting')) targets = job_config['targets'] try: - reimaged = lock_ops.reimage_machines(ctx, targets, job_config['machine_type']) + reimaged = teuthology.machines.reimage_machines(ctx, targets, job_config['machine_type']) except Exception as e: log.exception('Reimaging error. Nuking machines...') unlock_targets(job_config) @@ -251,7 +250,9 @@ def unlock_targets(job_config): :param job_config: dict, job config data """ - machine_statuses = query.get_statuses(job_config['targets'].keys()) + machine_statuses = teuthology.machines.machine_statuses( + job_config['targets'].keys() + ) locked = [] for status in machine_statuses: name = shortname(status['name']) @@ -269,7 +270,12 @@ def unlock_targets(job_config): return if job_config.get("unlock_on_failure", True): log.info('Unlocking machines...') - lock_ops.unlock_safe(locked, job_config["owner"], job_config["name"], job_config["job_id"]) + teuthology.machines.unlock_safe( + locked, + job_config["owner"], + job_config["name"], + job_config["job_id"], + ) def run_with_watchdog(process, job_config): diff --git a/teuthology/dispatcher/test/test_dispatcher.py b/teuthology/dispatcher/test/test_dispatcher.py index 58f58cf9cf..fbedc162ca 100644 --- a/teuthology/dispatcher/test/test_dispatcher.py +++ b/teuthology/dispatcher/test/test_dispatcher.py @@ -100,8 +100,8 @@ def build_fake_jobs(self, m_connection, m_job, job_bodies): @patch("beanstalkc.Job", autospec=True) @patch("teuthology.repo_utils.fetch_qa_suite") @patch("teuthology.repo_utils.fetch_teuthology") - @patch("teuthology.dispatcher.beanstalk.watch_tube") - @patch("teuthology.dispatcher.beanstalk.connect") + @patch("teuthology.jobqueue.beanstalk.beanstalk.watch_tube") + @patch("teuthology.jobqueue.beanstalk.beanstalk.connect") @patch("os.path.isdir", return_value=True) @patch("teuthology.dispatcher.setup_log_file") def test_main_loop( @@ -141,8 +141,8 @@ def test_main_loop( @patch("beanstalkc.Job", autospec=True) @patch("teuthology.repo_utils.fetch_qa_suite") @patch("teuthology.repo_utils.fetch_teuthology") - @patch("teuthology.dispatcher.beanstalk.watch_tube") - @patch("teuthology.dispatcher.beanstalk.connect") + @patch("teuthology.jobqueue.beanstalk.beanstalk.watch_tube") + @patch("teuthology.jobqueue.beanstalk.beanstalk.connect") @patch("os.path.isdir", return_value=True) @patch("teuthology.dispatcher.setup_log_file") def test_main_loop_13925( diff --git a/teuthology/exporter.py b/teuthology/exporter.py index 30aead8756..519a707584 100644 --- a/teuthology/exporter.py +++ b/teuthology/exporter.py @@ -9,8 +9,8 @@ import teuthology.beanstalk as beanstalk import teuthology.dispatcher +import teuthology.machines from teuthology.config import config -from teuthology.lock.query import list_locks log = logging.getLogger(__name__) @@ -215,7 +215,7 @@ def _init(self): def _update(self): for machine_type in MACHINE_TYPES: - nodes = list_locks(machine_type=machine_type) + nodes = teuthology.machines.machine_list(machine_type=machine_type) for up, locked in itertools.product([True, False], [True, False]): self.metric.labels(machine_type=machine_type, up=up, locked=locked).set( len([n for n in nodes if n["up"] is up and n["locked"] is locked]) diff --git a/teuthology/jobqueue/__init__.py b/teuthology/jobqueue/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/teuthology/jobqueue/base.py b/teuthology/jobqueue/base.py new file mode 100644 index 0000000000..165069ef3f --- /dev/null +++ b/teuthology/jobqueue/base.py @@ -0,0 +1,42 @@ +from typing import Dict, Any, Optional + +import abc +import enum + + +JobSpec = Dict[str, Any] + + +class QueueDirection(enum.Enum): + IN = 1 + OUT = 2 + BIDIR = 3 + + +class Job(abc.ABC): + jid: int + + @abc.abstractmethod + def bury(self) -> None: ... + + @abc.abstractmethod + def delete(self) -> None: ... + + @abc.abstractmethod + def job_config(self) -> JobSpec: ... + + +class JobQueue(abc.ABC): + @abc.abstractmethod + def put(self, job_config: JobSpec) -> int: ... + + @abc.abstractmethod + def get(self) -> Optional[Job]: ... + + @property + @abc.abstractmethod + def tube(self) -> str: ... + + @property + @abc.abstractmethod + def direction(self) -> QueueDirection: ... diff --git a/teuthology/jobqueue/beanstalk.py b/teuthology/jobqueue/beanstalk.py new file mode 100644 index 0000000000..787071c9e2 --- /dev/null +++ b/teuthology/jobqueue/beanstalk.py @@ -0,0 +1,68 @@ +from typing import Optional + +import yaml + +from teuthology import beanstalk +from teuthology.jobqueue import base + + +class Job(base.Job): + jid: int + + def __init__(self, job) -> None: + self._job = job + self.jid = job.jid + + def bury(self) -> None: + self._job.bury() + + def delete(self) -> None: + self._job.delete() + + def job_config(self) -> base.JobSpec: + return yaml.safe_load(self._job.body) + + +class JobQueue(base.JobQueue): + def __init__( + self, connection, tube: str, direction: base.QueueDirection + ) -> None: + self._connection = connection + self._direction = direction + if direction == base.QueueDirection.IN: + self._connection.use(tube) + self._tube = tube + elif direction == base.QueueDirection.OUT: + self._tube = beanstalk.watch_tube(tube) + else: + raise ValueError( + f'invalid direction for beanstalk job queue: {direction}' + ) + + def put(self, job_config: base.JobSpec) -> int: + if self._direction != base.QueueDirection.IN: + raise ValueError('not an input queue') + job = yaml.safe_dump(job_config) + jid = beanstalk.put( + job, + ttr=60 * 60 * 24, + priority=job_config['priority'], + ) + return jid + + def get(self) -> Optional[Job]: + if self._direction != base.QueueDirection.OUT: + raise ValueError('not an output queue') + return Job(self._connection.reserve(timeout=60)) + + @property + def tube(self) -> str: + return self._tube + + @property + def direction(self) -> base.QueueDirection: + return self._direction + + @classmethod + def connect(cls, tube: str, direction: base.QueueDirection): + return cls(beanstalk.connect(), tube, direction) diff --git a/teuthology/jobqueue/choice.py b/teuthology/jobqueue/choice.py new file mode 100644 index 0000000000..7335ab7133 --- /dev/null +++ b/teuthology/jobqueue/choice.py @@ -0,0 +1,29 @@ +from teuthology.config import config as teuth_config + +from teuthology.jobqueue.base import QueueDirection, JobQueue +from teuthology.jobqueue import beanstalk, sqlite +from teuthology.jobqueue import file as fileq + + +def from_backend( + backend: str, tube: str, direction: QueueDirection +) -> JobQueue: + if backend in ('beanstalk', ''): + return beanstalk.JobQueue.connect(tube, direction) + if backend.startswith('@'): + return fileq.JobQueue(backend.lstrip('@'), direction) + if backend.startswith('sqlite:'): + return sqlite.JobQueue(backend, tube, direction) + raise ValueError( + f"Unexpected queue backend: {backend!r}" + " (expected 'beanstalk', '@'," + " or 'sqlite://'" + ) + + +def from_config( + tube: str, direction: QueueDirection, backend: str = '' +) -> JobQueue: + if not backend: + backend = teuth_config.job_queue_backend or '' + return from_backend(backend, tube, direction) diff --git a/teuthology/jobqueue/file.py b/teuthology/jobqueue/file.py new file mode 100644 index 0000000000..4393617a65 --- /dev/null +++ b/teuthology/jobqueue/file.py @@ -0,0 +1,36 @@ +import yaml + +from teuthology.jobqueue import base + + +class JobQueue(base.JobQueue): + def __init__(self, path: str, direction: base.QueueDirection) -> None: + if direction != base.QueueDirection.IN: + raise ValueError('only output supported') + self._base_path = path + self._count_file_path = f'{path}.count' + self._count = 0 + try: + with open(self._count_file_path, 'r') as fh: + self._count = int(fh.read() or 0) + except FileNotFoundError: + pass + + def put(self, job_config: base.JobSpec) -> int: + jid = self._count = self._count + 1 + job_config['job_id'] = str(jid) + job = yaml.safe_dump(job_config) + with open(self._base_path, 'a') as fh: + fh.write('---\n') + fh.write(job) + with open(self._count_file_path, 'w') as fh: + fh.write(str(jid)) + print(f'Job scheduled with name {job_config["name"]} and ID {jid}') + + @property + def tube(self) -> str: + return '' + + @property + def direction(self) -> base.QueueDirection: + return base.QueueDirection.IN diff --git a/teuthology/jobqueue/sqlite.py b/teuthology/jobqueue/sqlite.py new file mode 100644 index 0000000000..b813278f16 --- /dev/null +++ b/teuthology/jobqueue/sqlite.py @@ -0,0 +1,100 @@ +from typing import Optional, Tuple + +import json +import sqlite3 +import time + +from teuthology.jobqueue import base + + +class Job(base.Job): + jid: int + + def __init__(self, jq: 'JobQueue', jid: int, data: str) -> None: + self._jq = jq + self.jid = jid + self._data = data + + def bury(self) -> None: + self.delete() + + def delete(self) -> None: + self._jq._delete(self.jid) + + def job_config(self) -> base.JobSpec: + return json.loads(self._data) + + +class JobQueue(base.JobQueue): + _retry_empty_sec = 30 + + def __init__( + self, path: str, tube: str, direction: base.QueueDirection + ) -> None: + self._path = path + self._tube = tube + # the sqlite job queue is always bidirectional + self._direction = base.QueueDirection.BIDIR + self._connect() + self._create_jobs_table() + + def put(self, job_config: base.JobSpec) -> int: + job = json.dumps(job_config) + return self._insert(job) + + def get(self) -> Optional[Job]: + result = self._select_job() + if result is None: + time.sleep(self._retry_empty_sec) + result = self._select_job() + if result is None: + return None + jid, data = result + return Job(self, jid, data) + + @property + def tube(self) -> str: + return self._tube + + @property + def direction(self) -> base.QueueDirection: + return self._direction + + def _select_job(self) -> Optional[Tuple[int, str]]: + with self._conn: + cur = self._conn.cursor() + cur.execute( + "SELECT rowid,jobdesc FROM jobs ORDER BY rowid LIMIT 1" + ) + rows = [(jid, data) for jid, data in cur.fetchall()] + if rows: + assert len(rows) == 1 + return rows[0] + return None + + def _insert(self, data: str) -> int: + with self._conn: + cur = self._conn.cursor() + cur.execute("INSERT INTO jobs VALUES (?)", (data,)) + jid = cur.lastrowid + cur.close() + return jid + + def _delete(self, jid: int) -> None: + with self._conn: + self._conn.execute("DELETE FROM jobs WHERE rowid=?", (jid,)) + + def _create_jobs_table(self) -> None: + try: + with self._conn: + self._conn.execute("CREATE TABLE jobs (jobdesc TEXT)") + except sqlite3.OperationalError: + pass + + def _connect(self) -> None: + path = self._path + if path.startswith('sqlite://'): + path = path[9:] + if path.startswith('sqlite:'): + path = path[7:] + self._conn = sqlite3.connect(path) diff --git a/teuthology/kill.py b/teuthology/kill.py index 137e49080e..c3f0aef695 100755 --- a/teuthology/kill.py +++ b/teuthology/kill.py @@ -10,11 +10,11 @@ from typing import Union import teuthology.exporter +import teuthology.machines from teuthology import beanstalk from teuthology import report from teuthology.config import config -from teuthology.lock import ops as lock_ops log = logging.getLogger(__name__) @@ -75,7 +75,7 @@ def kill_run(run_name, archive_base=None, owner=None, machine_type=None, if owner is not None: targets = find_targets(run_name) names = list(targets.keys()) - lock_ops.unlock_safe(names, owner, run_name) + teuthology.machines.unlock_safe(names, owner, run_name) report.try_mark_run_dead(run_name) @@ -103,7 +103,7 @@ def kill_job(run_name, job_id, archive_base=None, owner=None, skip_unlock=False) log.warn(f"Job {job_id} has no machine_type; cannot report via Prometheus") if not skip_unlock: targets = find_targets(run_name, job_id) - lock_ops.unlock_safe(list(targets.keys()), owner, run_name, job_id) + teuthology.machines.unlock_safe(list(targets.keys()), owner, run_name, job_id) def find_run_info(serializer, run_name): diff --git a/teuthology/machines/__init__.py b/teuthology/machines/__init__.py new file mode 100644 index 0000000000..a3d4308857 --- /dev/null +++ b/teuthology/machines/__init__.py @@ -0,0 +1,29 @@ + +from teuthology.machines.pool import from_config, auto_pool +from teuthology.machines.wrappers import ( + find_stale_locks, + is_vm, + machine_list, + machine_status, + machine_statuses, + must_reserve, + reimage_machines, + unlock_one, + unlock_safe, + update_lock, +) + +__all__ = [ + "auto_pool", + "find_stale_locks", + "from_config", + "is_vm", + "machine_list", + "machine_status", + "machine_statuses", + "must_reserve", + "reimage_machines", + "unlock_one", + "unlock_safe", + "update_lock", +] diff --git a/teuthology/machines/base.py b/teuthology/machines/base.py new file mode 100644 index 0000000000..687a02a4cd --- /dev/null +++ b/teuthology/machines/base.py @@ -0,0 +1,68 @@ +import abc + + +class MachinePool(abc.ABC): + @abc.abstractmethod + def description(self) -> str: ... + + @abc.abstractmethod + def list( + self, + machine_type: str | None = None, + up: bool | None = None, + locked: bool | None = None, + count: int | None = None, + tries: int | None = None, + ): ... + + @abc.abstractmethod + def reserve( + self, + ctx: Any, + num: int, + machine_type: str, + user: str | None = None, + description: str | None = None, + os_type: str | None = None, + os_version: str | None = None, + arch: str | None = None, + reimage: bool = True, + ): ... + + @abc.abstractmethod + def release( + self, + ctx: Any, + name: str, + user: str | None = None, + description: str | None = None, + status_hint: str | None = None, + constraints: str | None = None, + ) -> bool: ... + + @abc.abstractmethod + def is_vm(self, name: str) -> bool: ... + + @abc.abstractmethod + def statuses(self, name: list[str]) -> list[dict]: ... + + +class ExtendedMachinePool(MachinePool, abc.ABC): + """For testing and verification of optional machine pool methods.""" + + @abc.abstractmethod + def reimage_machines( + self, ctx: Any, machines: list[str], machine_type: str + ) -> list[str]: ... + + @abc.abstractmethod + def update_lock( + self, + name: str, + description: str | None = None, + status: str | None = None, + ssh_pub_key: str | None = None, + ): ... + + @abc.abstractmethod + def status(self, machine: str) -> dict: ... diff --git a/teuthology/machines/lock_server_pool.py b/teuthology/machines/lock_server_pool.py new file mode 100644 index 0000000000..58bfe3f48e --- /dev/null +++ b/teuthology/machines/lock_server_pool.py @@ -0,0 +1,100 @@ + +from teuthology.machines.base import MachinePool +from teuthology.lock import query, ops + + +class LockServerMachinePool(MachinePool): + def description(self) -> str: + return "Locks Physical Lab Machines via API" + + def list( + self, + machine_type=None, + up=None, + locked=None, + count=None, + tries=None, + ): + kwargs = {} + if machine_type is not None: + kwargs['machine_type'] = machine_type + if up is not None: + kwargs['up'] = up + if locked is not None: + kwargs['locked'] = locked + if count is not None: + kwargs['count'] = count + if tries is not None: + kwargs['tries'] = tries + return query.list_locks(**kwargs) + + def reserve( + self, + ctx, + num, + machine_type, + user=None, + description=None, + os_type=None, + os_version=None, + arch=None, + reimage=True, + min_spare=None, + ): + return ops.lock_many( + ctx, + num, + machine_type=machine_type, + user=ctx.owner, + description=ctx.archive, + os_type=os_type, + os_version=os_version, + arch=arch, + reimage=reimage, + ) + + def release( + self, + ctx, + name, + user=None, + description=None, + status_hint=None, + run_name=None, + job_id=None, + ) -> bool: + if job_id or run_name: + assert ( + not description + ), "description not supported with job_id/run_name" + assert ( + not status_hint + ), "status_hint not supported with job_id/run_name" + return ops.unlock_one_safe( + name=name, + owner=user, + run_name=run_name, + job_id=job_id, + ) + return ops.unlock_one( + name=name, + user=user, + description=description, + status=status_hint, + ) + + def is_vm(self, name: str) -> bool: + return query.is_vm(name) + + def statuses(self, machines: 'list[str]') -> 'list[dict]': + return query.get_statuses(name) + + def update_lock( + self, name, description=None, status=None, ssh_pub_key=None + ): + return ops.update_lock( + name, + description=description, + status=status, + ssh_pub_key=ssh_pub_key, + ) diff --git a/teuthology/machines/pool.py b/teuthology/machines/pool.py new file mode 100644 index 0000000000..24ad181cd5 --- /dev/null +++ b/teuthology/machines/pool.py @@ -0,0 +1,28 @@ + +import logging + +from teuthology.config import config +from teuthology.machines.base import MachinePool +from teuthology.machines.lock_server_pool import LockServerMachinePool +from teuthology.machines.sqlite_pool import SqliteMachinePool + + +log = logging.getLogger(__name__) + + +def from_config() -> MachinePool: + if not config.machine_pool or config.machine_pool.startswith( + 'lock_server' + ): + mpool = LockServerMachinePool() + elif config.machine_pool.startswith('sqlite:'): + mpool = SqliteMachinePool() + log.info("Using machine pool: %s", mpool.description()) + return mpool + + +def auto_pool(ctx=None, pool: MachinePool | None = None) -> MachinePool: + if pool is not None: + return pool + # TODO: cached pool on ctx if ctx is given + return from_config() diff --git a/teuthology/machines/sqlite_pool.py b/teuthology/machines/sqlite_pool.py new file mode 100644 index 0000000000..277e5cc41a --- /dev/null +++ b/teuthology/machines/sqlite_pool.py @@ -0,0 +1,538 @@ +from typing import Any + +import contextlib +import functools +import json +import logging +import shlex +import sqlite3 +import subprocess +import time +import traceback + +from teuthology.config import config +from teuthology.machines.base import MachinePool + + +log = logging.getLogger(__name__) + + +class TooFewMachines(Exception): + pass + + +class _SqliteDBManager: + def __init__(self, path: str, *, automatic_release: bool = False) -> None: + assert path + self._path = path + self._connect() + self._create_tables() + self.automatic_release = automatic_release + log.info( + "Initialized sqlite machine pool db manager: %r, %r", + path, + automatic_release, + ) + if automatic_release is False: + raise ValueError("x") + + def _connect(self) -> None: + path = self._path + if path.startswith('sqlite://'): + path = path[9:] + if path.startswith('sqlite:'): + path = path[7:] + log.info("sqlite3 db path: %s", path) + self._conn = sqlite3.connect(path, isolation_level=None) + self._conn.row_factory = sqlite3.Row + self._conn.set_trace_callback(log.info) + + def _create_tables(self) -> None: + try: + with self._conn: + self._conn.execute( + """ + CREATE TABLE IF NOT EXISTS machines ( + name TEXT UNIQUE, + machine_type TEXT, + up INTEGER, + in_use INTEGER, + user TEXT, + desc TEXT, + info JSON + ) + """ + ) + self._conn.execute( + """ + CREATE TABLE IF NOT EXISTS hooks ( + hook TEXT UNIQUE, + command JSON + ) + """ + ) + except sqlite3.OperationalError: + pass + + @contextlib.contextmanager + def _tx(self): + try: + cur = self._conn.cursor() + cur.execute('BEGIN;') + yield cur + self._conn.commit() + except Exception: + self._conn.rollback() + raise + + def select( + self, + *, + machine_type: str | None = None, + up: bool | None = None, + locked: bool | None = None, + user: str | None = None, + desc: str | None = None, + limit: int | None = None, + ): + query = ( + "SELECT name, machine_type, up, in_use, user, desc, info" + " FROM machines" + ) + where = _Where() + where.add_if('machine_type', machine_type) + where.add_if('up', up) + where.add_if('in_use', locked) + where.add_if('user', user) + where.add_if('desc', desc, (int, str, _Like)) + where_params = where.parameters() + if where_params: + query += f' {where}' + if limit is not None: + query += ' LIMIT ' + str(int(limit)) + + with self._tx() as cur: + cur.execute(query, tuple(where_params)) + rows = cur.fetchall() + log.info("Rows: %r", rows) + return rows + + def add_machine(self, name, machine_type, info): + with self._tx() as cur: + cur.execute( + "INSERT INTO machines VALUES (?,?, 1, 0, '', '', ?)", + (name, machine_type, info), + ) + + def remove_machine(self, name): + with self._tx() as cur: + cur.execute("DELETE FROM machines WHERE name=?", (name,)) + + def remove_all_machines(self): + with self._tx() as cur: + cur.execute('DELETE FROM machines') + + def take(self, machine_type: str, count: int, user: str, desc: str): + count = int(count) + query = ( + "UPDATE machines" + " SET in_use=1, user=?, desc=?" + " WHERE rowid" + " IN (" + " SELECT rowid FROM machines" + " WHERE in_use=0 AND machine_type=? LIMIT ?" + ")" + ) + with self._tx() as cur: + cur.execute(query, (user, desc, machine_type, count)) + if cur.rowcount != count: + raise TooFewMachines() + + def release( + self, name: str, user: str | None = None, desc: str | None = None + ) -> bool: + where = _Where() + where.add_if('name', name) + where.add_if('user', user) + where.add_if('desc', desc, (int, str, _Like)) + if self.automatic_release: + query = f"UPDATE machines SET in_use=0, user='', desc='' {where}" + else: + query = f"UPDATE machines SET up=0 {where}" + with self._tx() as cur: + cur.execute(query, tuple(where.parameters())) + modified = cur.rowcount >= 1 + return modified + + def get_hook(self, hook_name: str) -> dict: + query = "SELECT command FROM hooks WHERE hook = ? LIMIT 1" + with self._tx() as cur: + cur.execute(query, (hook_name,)) + rows = cur.fetchall() + log.info("Rows: %r", rows) + if rows: + return json.loads(rows[0]['command']) + return {} + + def set_hook(self, hook_name: str, command: dict) -> None: + query = "INSERT or REPLACE INTO hooks VALUES (?, ?)" + cj = json.dumps(command) + with self._tx() as cur: + cur.execute(query, (hook_name, cj)) + + +class _Like: + def __init__(self, *values): + self.values = list(values) + + def __str__(self): + return ''.join(self.values) + + +class _EndsWith(_Like): + def __init__(self, value): + super().__init__('%', value) + + +class _Where: + def __init__(self): + self._where = [] + + def add(self, key: str, value: Any) -> None: + self._where.append((key, value)) + + def add_if(self, key: str, value: Any, allowed_types: 'Iterable[Type] | None' = None) -> None: + if value is None: + return + if not allowed_types: + allowed_types = (int, str) + if not isinstance(value, tuple(allowed_types)): + raise TypeError(f'type {type(value)} not allowed') + self.add(key, value) + + def __str__(self) -> str: + wh = ' AND '.join(self._op(k, v) for k, v in self._where) + return f'WHERE ({wh})' + + def _op(self, key: str, value: Any): + if isinstance(value, _Like): + return f'{key} LIKE ?' + return f'{key} = ?' + + def _value(self, value): + if isinstance(value, int): + return value + return str(value) + + def parameters(self) -> list[Any]: + return [self._value(v) for _, v in self._where] + + +def _track(fn): + functools.wraps(fn) + + def _fn(*args, **kwargs): + log.warning('CALLING sqlite_pool fn %s: %r, %r', fn, args, kwargs) + result = fn(*args, **kwargs) + log.warning('CALLED sqlite_pool fn %s, got %r', fn, result) + return result + + return _fn + + +class _Hook: + def __init__(self, arguments: list[str]) -> None: + self.arguments = arguments + if not isinstance(self.arguments, list): + raise ValueError('expected arguments list') + + def _execute(self, command: list[str]) -> None: + log.info( + "Running hook command: %s", + " ".join(shlex.quote(c) for c in command) + ) + result = subprocess.run(command, capture_output=True) + log.info("Command result: %s: %r, %r", result.returncode, + result.stdout, result.stderr) + if result.returncode != 0: + raise RuntimeError('hook command failed') + + def execute(self) -> None: + self._execute(self.arguments) + + +class _NamedHook(_Hook): + def __init__(self, arguments: list[str]) -> None: + super().__init__(arguments) + if '${NAME}' not in self.arguments: + raise ValueError('no name variable in arguments') + + def _replace(self, name: str) -> list[str]: + out = [] + for term in self.arguments: + if term == '${NAME}': + out.append(name) + else: + out.append(term) + return out + + def execute(self, name: str) -> None: + command = self._replace(name) + self._execute(command) + + +class _NoOpHook: + def execute(self, name: str) -> None: + return None + + +class SqliteMachinePool(MachinePool): + def __init__(self, *, path=None, automatic_release=False): + if not path: + path = config.machine_pool + if not automatic_release: + _sqp = config.get('sqlite_pool', {}) or {} + log.warning('xxx: %r', _sqp) + automatic_release = _sqp.get('automatic_release', False) + log.warning("zzz: %r", automatic_release) + self.dbmgr = _SqliteDBManager(path, automatic_release=automatic_release) + self._delay_sec = 15 + + def description(self) -> str: + return "Machine Pool Managed via Local SQLite3 DB" + + @_track + def list( + self, + machine_type=None, + up=None, + locked=None, + count=None, + tries=None, + ): + return {v['name']:None for v in self._list()} + + def _list( + self, + machine_type=None, + up=None, + locked=None, + count=None, + tries=None, + ): + result = { + v + for v in self.dbmgr.select( + machine_type=machine_type, up=up, locked=locked, limit=count + ) + } + return result + + @_track + def everything(self): + return [dict(v) for v in self.dbmgr.select()] + + @_track + def reserve( + self, + ctx, + num, + machine_type, + user=None, + description=None, + os_type=None, + os_version=None, + arch=None, + reimage=True, + min_spare: int = 0, + ) -> None: + user = user or getattr(ctx, 'owner', '') + description = description or getattr(ctx, 'archive', '') + if not user or not description: + raise ValueError('missing user or description (archive)') + while True: + available = self.dbmgr.select( + machine_type=machine_type, + up=True, + locked=False, + limit=(num + min_spare), + ) + if len(available) < (num + min_spare): + log.info( + 'too few free nodes: requested %d, need %d spare, have %d', + num, + min_spare, + len(available), + ) + time.sleep(self._delay_sec) + continue + try: + self.dbmgr.take( + machine_type, + num, + user, + description, + ) + except TooFewMachines: + log.warning('too few nodes to take (possible race)') + time.sleep(self._delay_sec) + continue + reserved = self.dbmgr.select( + machine_type=machine_type, + up=True, + locked=True, + user=user, + desc=description, + ) + assert num == len(reserved), f"needed {num} machines, got {len(reserved)}" + ctx.config['targets'] = {v['name']: None for v in reserved} + return + + @_track + def release( + self, + ctx, + name, + user=None, + description=None, + status_hint=None, + run_name=None, + job_id=None, + ) -> bool: + log.info("WFT") + desc = None + if run_name or job_id: + desc = _EndsWith(f"{run_name}/{job_id}") + return self.dbmgr.release( + name, + user=user, + desc=desc, + ) + + @_track + def is_vm(self, name: str) -> bool: + # always return false. this may be a lie, but teuthology's + # meaning of is_vm doesn't really mean it's actually a vm, but + # that it needs special handling. It doesn't, because this + # machine pool abstracts that away. + return False + + @_track + def statuses(self, machines: 'list[str]') -> 'list[dict]': + out = [] + for v in self._list(): + if machines and v['name'] not in machines: + continue + out.append({ + 'name': v['name'], + 'machine_type': v['machine_type'], + 'locked': v['in_use'], + 'user': v['user'], + 'description': v['desc'], + 'info': v['info'], + }) + return out + + @_track + def status(self, machine: str) -> dict: + # this function is sometimes fed fqdns and that's not what we want. + name = machine.split('.', 1)[0] + name = name.split('@', 1)[-1] + return self.statuses([name])[0] + + @_track + def reimage_machines(self, machines, machine_type): + hook = self._remiage_hook() + res = {m: hook.execute(m) for m in machines} + post_hook = self._post_reimage_hook() + post_hook.execute() + return res + + def _remiage_hook(self) -> _Hook: + if not self.dbmgr.automatic_release: + log.info("Automatic release not set, will not reimage") + return _NoOpHook() + hook_cfg = self.dbmgr.get_hook('reimage') + if hook_cfg: + return _NamedHook(**hook_cfg) + log.info("No reimage hook command found") + return _NoOpHook() + + def _post_reimage_hook(self) -> _Hook: + if not self.dbmgr.automatic_release: + log.info("Automatic release not set, will not reimage") + return _NoOpHook() + hook_cfg = self.dbmgr.get_hook('postreimage') + if hook_cfg: + return _Hook(**hook_cfg) + log.info("No reimage hook command found") + return _NoOpHook() + + +def main(): + import argparse + import sys + import yaml + + class Context: + pass + + parser = argparse.ArgumentParser() + parser.add_argument('--list', action='store_true') + parser.add_argument('--add', action='append') + parser.add_argument('--rm-all', action='store_true') + parser.add_argument('--rm', action='append') + parser.add_argument('--reserve', type=int) + parser.add_argument('--user-desc', type=str) + parser.add_argument('--machine-type') + parser.add_argument('--info') + parser.add_argument('--release', type=json.loads) + parser.add_argument('--set-hook', type=json.loads) + parser.add_argument('--get-hook', type=str) + cli = parser.parse_args() + + mpool = SqliteMachinePool() + if cli.rm_all: + mpool.dbmgr.remove_all_machines() + for name in cli.rm or []: + mpool.dbmgr.remove_machine(name) + for name in cli.add or []: + mpool.dbmgr.add_machine(name, cli.machine_type, cli.info) + if cli.reserve: + ctx = Context() + user, desc = getattr(cli, 'user_desc', '').split('%', 1) + mpool.reserve(ctx, cli.reserve, cli.machine_type) + if cli.list: + yaml.safe_dump(mpool.everything(), sys.stdout, sort_keys=False) + if cli.release: + log.info("RELEASE %r", cli.release) + name = cli.release.get('name') + user = cli.release.get('user') + status_hint = cli.release.get('status_hint') + run_name = cli.release.get('run_name') + job_id = cli.release.get('job_id') + mpool.release( + Context(), + name, + user=user, + status_hint=status_hint, + run_name=run_name, + job_id=job_id, + ) + if cli.set_hook: + if not isinstance(cli.set_hook, dict): + raise ValueError('incorrect type') + _keys = list(cli.set_hook.keys()) + if len(_keys) != 1: + raise ValueError('incorrect number of keys') + hook_name = _keys[0] + hook_command = cli.set_hook[hook_name] + log.info('SET HOOK %r', hook_name, hook_command) + _Hook(**hook_command) # validate + mpool.dbmgr.set_hook(hook_name, hook_command) + if cli.get_hook: + print(mpool.dbmgr.get_hook(cli.get_hook)) + + +if __name__ == '__main__': + main() diff --git a/teuthology/machines/wrappers.py b/teuthology/machines/wrappers.py new file mode 100644 index 0000000000..1ee9429abf --- /dev/null +++ b/teuthology/machines/wrappers.py @@ -0,0 +1,151 @@ +from teuthology.config import config +from teuthology.machines.base import MachinePool +from teuthology.machines.pool import auto_pool + + +def _temp_context(): + return None + + +def must_reserve( + ctx, + num: int, + machine_type: str, + reimage: bool = True, + tries = None, + *, + machine_pool: MachinePool | None = None, +) -> None: + os_type = ctx.config.get("os_type") + os_version = ctx.config.get("os_version") + arch = ctx.config.get("arch") + reserved = config.reserve_machines + assert isinstance(reserved, int), "reserve_machines must be integer" + assert reserved >= 0, "reserve_machines should >= 0" + + return auto_pool(pool=machine_pool).reserve( + ctx, + num=num, + min_spare=reserved, + machine_type=machine_type, + user=ctx.owner, + description=ctx.archive, + os_type=os_type, + os_version=os_version, + arch=arch, + reimage=reimage, + ) + + +def unlock_one( + name: str, + user: str, + description: str | None = None, + status: dict | None = None, + *, + machine_pool: MachinePool | None = None, +) -> bool: + return auto_pool(pool=machine_pool).release( + _temp_context(), + name=name, + user=user, + description=description, + status_hint=status, + ) + + +def unlock_safe( + names: list[str], + owner: str, + run_name: str = "", + job_id: str = "", + *, + machine_pool: MachinePool | None = None, +) -> bool: + pool = auto_pool(pool=machine_pool) + + def _unlock(name: str) -> bool: + return pool.release( + _temp_context(), + name=name, + user=owner, + run_name=run_name, + job_id=job_id, + ) + + # Does this NEED to be parallel? It is in the original version. + return all(_unlock(name) for name in names) + + +def find_stale_locks( + owner=None, *, machine_pool: MachinePool | None = None +) -> list[dict]: + pool = auto_pool(pool=machine_pool) + return pool.list(stale=True, owner=owner) + + +def reimage_machines( + ctx, machines, machine_type, *, machine_pool: MachinePool | None = None +): + pool = auto_pool(pool=machine_pool) + reimage_machines_fn = getattr(pool, 'reimage_machines', None) + if reimage_machines_fn is None: + raise NotImplementedError("pool does not support reimage_machines") + return reimage_machines_fn(machines, machine_type) + + +def update_lock( + name, + description=None, + status=None, + ssh_pub_key=None, + *, + machine_pool: MachinePool | None = None, +) -> bool: + pool = auto_pool(pool=machine_pool) + update_lock_fn = getattr(pool, 'update_lock', None) + if update_lock_fn is None: + raise NotImplementedError("pool does not support update_lock") + return update_lock_fn(name, description, status, ssh_pub_key) + + +def is_vm( + name: str, + *, + machine_pool: MachinePool | None = None, +) -> bool: + return auto_pool(pool=machine_pool).is_vm(name) + + +def machine_status( + name: str, + *, + machine_pool: MachinePool | None = None, +): + pool = auto_pool(pool=machine_pool) + machine_status_fn = getattr(pool, 'status', None) + if machine_status_fn is not None: + return machine_status_fn(name) + # fall back to getting one machine from statuses + return pool.statuses([name])[0] + + +def machine_statuses( + names, + *, + machine_pool: MachinePool | None = None, +): + pool = auto_pool(pool=machine_pool) + return pool.statuses(names) + + +def machine_list( + machine_type: str|None = None, + count: int|None = None, + tries: int|None = None, + *, + machine_pool: MachinePool | None = None, +) -> bool: + # same as machine_statuses? + pool = auto_pool(pool=machine_pool) + return pool.list(machine_type=machine_type, count=count, tries=tries) diff --git a/teuthology/orchestra/console.py b/teuthology/orchestra/console.py index a9c67ebbf1..5dfdfbda98 100644 --- a/teuthology/orchestra/console.py +++ b/teuthology/orchestra/console.py @@ -9,8 +9,8 @@ from typing import Union, Literal, Optional -import teuthology.lock.query -import teuthology.lock.util +import teuthology.machines + from teuthology.config import config from teuthology.contextutil import safe_while from teuthology.exceptions import ConsoleError @@ -384,9 +384,9 @@ def __init__(self, name): self.shortname = self.getShortName(name) self.log = log.getChild(self.shortname) - status_info = teuthology.lock.query.get_status(self.shortname) + status_info = teuthology.machines.get_status(self.shortname) try: - if teuthology.lock.query.is_vm(status=status_info): + if teuthology.machines.is_vm(status=status_info): phys_host = status_info['vm_host']['name'].split('.')[0] except TypeError: raise RuntimeError("Cannot create a virtual console for %s", name) diff --git a/teuthology/orchestra/remote.py b/teuthology/orchestra/remote.py index 36d695e3aa..5ffc967f30 100644 --- a/teuthology/orchestra/remote.py +++ b/teuthology/orchestra/remote.py @@ -2,8 +2,6 @@ Support for paramiko remote objects. """ -import teuthology.lock.query -import teuthology.lock.util from teuthology.contextutil import safe_while from teuthology.orchestra import run from teuthology.orchestra import connection @@ -153,7 +151,7 @@ def chcon(self, file_path, context): if self.os.package_type != 'rpm' or \ self.os.name in ['opensuse', 'sle']: return - if teuthology.lock.query.is_vm(self.shortname): + if teuthology.machines.is_vm(self.shortname): return self.run(args="sudo chcon {con} {path}".format( con=context, path=file_path)) @@ -510,7 +508,7 @@ def hostname(self): @property def machine_type(self): if not getattr(self, '_machine_type', None): - remote_info = teuthology.lock.query.get_status(self.hostname) + remote_info = teuthology.machines.machine_status(self.hostname) if not remote_info: return None self._machine_type = remote_info.get("machine_type", None) @@ -758,7 +756,7 @@ def console(self): @property def is_vm(self): if not hasattr(self, '_is_vm'): - self._is_vm = teuthology.lock.query.is_vm(self.name) + self._is_vm = teuthology.machines.is_vm(self.name) return self._is_vm @property @@ -797,7 +795,7 @@ def getRemoteConsole(name, ipmiuser=None, ipmipass=None, ipmidomain=None, """ Return either VirtualConsole or PhysicalConsole depending on name. """ - if teuthology.lock.query.is_vm(name): + if teuthology.machines.is_vm(name): try: return console.VirtualConsole(name) except Exception: diff --git a/teuthology/provision/__init__.py b/teuthology/provision/__init__.py index 48392eabae..c8d490784a 100644 --- a/teuthology/provision/__init__.py +++ b/teuthology/provision/__init__.py @@ -64,7 +64,7 @@ def create_if_vm(ctx, machine_name, _downburst=None): if _downburst: status_info = _downburst.status else: - status_info = teuthology.lock.query.get_status(machine_name) + status_info = teuthology.machines.machine_status(machine_name) shortname = decanonicalize_hostname(machine_name) machine_type = status_info['machine_type'] os_type = get_distro(ctx) diff --git a/teuthology/schedule.py b/teuthology/schedule.py index d9af64efc4..5d1409b219 100644 --- a/teuthology/schedule.py +++ b/teuthology/schedule.py @@ -1,9 +1,9 @@ -import os import yaml -import teuthology.beanstalk -from teuthology.misc import get_user, merge_configs from teuthology import report +from teuthology.jobqueue.base import QueueDirection +from teuthology.misc import get_user, merge_configs +import teuthology.jobqueue.choice def main(args): @@ -32,16 +32,11 @@ def main(args): if not name or name.isdigit(): raise ValueError("Please use a more descriptive value for --name") job_config = build_config(args) - backend = args['--queue-backend'] if args['--dry-run']: print('---\n' + yaml.safe_dump(job_config)) - elif backend == 'beanstalk': - schedule_job(job_config, args['--num'], report_status) - elif backend.startswith('@'): - dump_job_to_file(backend.lstrip('@'), job_config, args['--num']) - else: - raise ValueError("Provided schedule backend '%s' is not supported. " - "Try 'beanstalk' or '@path-to-a-file" % backend) + return + queue = select_queue(args['--queue-backend'], job_config) + schedule_job(queue, job_config, args['--num'], report_status) def build_config(args): @@ -87,57 +82,29 @@ def build_config(args): return job_config -def schedule_job(job_config, num=1, report_status=True): +def schedule_job(queue, job_config, num=1, report_status=True): """ Schedule a job. + :param queue: Job queue object :param job_config: The complete job dict :param num: The number of times to schedule the job + :param report_status: Enable or disable reporting job """ - num = int(num) - job = yaml.safe_dump(job_config) - tube = job_config.pop('tube') - beanstalk = teuthology.beanstalk.connect() - beanstalk.use(tube) - while num > 0: - jid = beanstalk.put( - job, - ttr=60 * 60 * 24, - priority=job_config['priority'], - ) - print('Job scheduled with name {name} and ID {jid}'.format( - name=job_config['name'], jid=jid)) - job_config['job_id'] = str(jid) + for _ in range(int(num)): + queue.put(job_config) if report_status: report.try_push_job_info(job_config, dict(status='queued')) - num -= 1 -def dump_job_to_file(path, job_config, num=1): +def select_queue(backend, job_config): """ - Schedule a job. + Select the kind of job queue to use. + :param backend: String name of queue :param job_config: The complete job dict - :param num: The number of times to schedule the job - :param path: The file path where the job config to append """ - num = int(num) - count_file_path = path + '.count' - - jid = 0 - if os.path.exists(count_file_path): - with open(count_file_path, 'r') as f: - jid=int(f.read() or '0') - - with open(path, 'a') as f: - while num > 0: - jid += 1 - job_config['job_id'] = str(jid) - job = yaml.safe_dump(job_config) - print('Job scheduled with name {name} and ID {jid}'.format( - name=job_config['name'], jid=jid)) - f.write('---\n' + job) - num -= 1 - with open(count_file_path, 'w') as f: - f.write(str(jid)) - + tube = job_config.get('tube', '') + return teuthology.jobqueue.choice.from_backend( + backend, tube, QueueDirection.IN + ) diff --git a/teuthology/suite/test/test_util.py b/teuthology/suite/test/test_util.py index daa583023b..214cf0248b 100644 --- a/teuthology/suite/test/test_util.py +++ b/teuthology/suite/test/test_util.py @@ -99,17 +99,17 @@ def test_get_branch_info(self, m_get): ) assert result == "some json" - @patch('teuthology.lock.query') - def test_get_arch_fail(self, m_query): - m_query.list_locks.return_value = False + @patch('teuthology.lock.query.list_locks') + def test_get_arch_fail(self, m_list_locks): + m_list_locks.return_value = False util.get_arch('magna') - m_query.list_locks.assert_called_with(machine_type="magna", count=1, tries=1) + m_list_locks.assert_called_with(machine_type="magna", count=1, tries=1) - @patch('teuthology.lock.query') - def test_get_arch_success(self, m_query): - m_query.list_locks.return_value = [{"arch": "arch"}] + @patch('teuthology.lock.query.list_locks') + def test_get_arch_success(self, m_list_locks): + m_list_locks.return_value = [{"arch": "arch"}] result = util.get_arch('magna') - m_query.list_locks.assert_called_with( + m_list_locks.assert_called_with( machine_type="magna", count=1, tries=1 ) diff --git a/teuthology/suite/util.py b/teuthology/suite/util.py index cc884ebf90..acd0e9d2cc 100644 --- a/teuthology/suite/util.py +++ b/teuthology/suite/util.py @@ -10,8 +10,6 @@ from email.mime.text import MIMEText -import teuthology.lock.query -import teuthology.lock.util from teuthology import repo_utils from teuthology.config import config @@ -22,6 +20,7 @@ from teuthology.packaging import get_builder_project, VersionNotFoundError from teuthology.repo_utils import build_git_url from teuthology.task.install import get_flavor +import teuthology.machines log = logging.getLogger(__name__) @@ -263,7 +262,7 @@ def get_arch(machine_type): :returns: A string or None """ - result = teuthology.lock.query.list_locks(machine_type=machine_type, count=1, tries=1) + result = teuthology.machines.machine_list(machine_type=machine_type, count=1, tries=1) if not result: log.warning("No machines found with machine_type %s!", machine_type) else: diff --git a/teuthology/task/internal/check_lock.py b/teuthology/task/internal/check_lock.py index 152e41c2d9..b824fee5f5 100644 --- a/teuthology/task/internal/check_lock.py +++ b/teuthology/task/internal/check_lock.py @@ -1,7 +1,6 @@ import logging -import teuthology.lock.query -import teuthology.lock.util +import teuthology.machines from teuthology.config import config as teuth_config @@ -17,7 +16,7 @@ def check_lock(ctx, config, check_up=True): return log.info('Checking locks...') for machine in ctx.config['targets'].keys(): - status = teuthology.lock.query.get_status(machine) + status = teuthology.machines.machine_status(machine) log.debug('machine status is %s', repr(status)) assert status is not None, \ 'could not read lock status for {name}'.format(name=machine) diff --git a/teuthology/task/internal/lock_machines.py b/teuthology/task/internal/lock_machines.py index fdbfcc2251..9953245c2a 100644 --- a/teuthology/task/internal/lock_machines.py +++ b/teuthology/task/internal/lock_machines.py @@ -1,9 +1,7 @@ import contextlib import logging -import teuthology.lock.ops -import teuthology.lock.query -import teuthology.lock.util +import teuthology.machines log = logging.getLogger(__name__) @@ -19,11 +17,11 @@ def lock_machines(ctx, config): machine_type = config[1] total_requested = config[0] # We want to make sure there are always this many machines available - teuthology.lock.ops.block_and_lock_machines(ctx, total_requested, machine_type) + teuthology.machines.must_reserve_machines(ctx, total_requested, machine_type) try: yield finally: if ctx.config.get("unlock_on_failure", True): log.info('Unlocking machines...') for machine in ctx.config['targets'].keys(): - teuthology.lock.ops.unlock_one(machine, ctx.owner, ctx.archive) + teuthology.machines.unlock_one(machine, ctx.owner, ctx.archive)