Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions teuthology/dispatcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@
setup_log_file,
install_except_hook,
# modules
beanstalk,
exporter,
report,
repo_utils,
)
from teuthology.config import config as teuth_config
from teuthology.dispatcher import supervisor
from teuthology.exceptions import BranchNotFoundError, CommitNotFoundError, SkipJob, MaxWhileTries
from teuthology.lock import ops as lock_ops
from teuthology.util.time import parse_timestamp
from teuthology import safepath
from teuthology.jobqueue.base import QueueDirection
import teuthology.machines
import teuthology.jobqueue.choice


log = logging.getLogger(__name__)
start_time = datetime.datetime.now(datetime.timezone.utc)
Expand Down Expand Up @@ -88,8 +90,7 @@ def main(args):

load_config(archive_dir=archive_dir)

connection = beanstalk.connect()
beanstalk.watch_tube(connection, args.tube)
queue = teuthology.jobqueue.choice.from_config(args.tube, QueueDirection.OUT)
result_proc = None

if teuth_config.teuthology_path is None:
Expand Down Expand Up @@ -118,7 +119,7 @@ def main(args):
if rc is not None:
worst_returncode = max([worst_returncode, rc])
job_procs.remove(proc)
job = connection.reserve(timeout=60)
job = queue.get()
if job is None:
if args.exit_on_empty_queue and not job_procs:
log.info("Queue is empty and no supervisor processes running; exiting!")
Expand All @@ -129,8 +130,8 @@ def main(args):
job.bury()
job_id = job.jid
log.info('Reserved job %d', job_id)
log.info('Config is: %s', job.body)
job_config = yaml.safe_load(job.body)
job_config = job.job_config()
log.info('Config is: %s', job_config)
job_config['job_id'] = str(job_id)

if job_config.get('stop_worker'):
Expand Down Expand Up @@ -182,7 +183,7 @@ def main(args):
log.exception(error_message)
if 'targets' in job_config:
node_names = job_config["targets"].keys()
lock_ops.unlock_safe(
teuthology.machines.unlock_safe(
node_names,
job_config["owner"],
job_config["name"],
Expand Down Expand Up @@ -345,7 +346,7 @@ def lock_machines(job_config):
machine_type=machine_type,
count=count,
):
lock_ops.block_and_lock_machines(
teuthology.machines.must_reserve(
fake_ctx,
count,
machine_type,
Expand Down
18 changes: 12 additions & 6 deletions teuthology/dispatcher/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
from teuthology import setup_log_file, install_except_hook
from teuthology.misc import get_user, archive_logs, compress_logs
from teuthology.config import FakeNamespace
from teuthology.lock import ops as lock_ops
from teuthology.task import internal
from teuthology.misc import decanonicalize_hostname as shortname
from teuthology.lock import query
from teuthology.util import sentry
import teuthology.machines

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -205,7 +204,7 @@ def check_for_reimage_failures_and_mark_down(targets, count=10):
continue
# Mark machine down
machine_name = shortname(k)
lock_ops.update_lock(
teuthology.machines.update_lock(
machine_name,
description='reimage failed {0} times'.format(count),
status='down',
Expand All @@ -223,7 +222,7 @@ def reimage(job_config):
report.try_push_job_info(ctx.config, dict(status='waiting'))
targets = job_config['targets']
try:
reimaged = lock_ops.reimage_machines(ctx, targets, job_config['machine_type'])
reimaged = teuthology.machines.reimage_machines(ctx, targets, job_config['machine_type'])
except Exception as e:
log.exception('Reimaging error. Nuking machines...')
unlock_targets(job_config)
Expand Down Expand Up @@ -251,7 +250,9 @@ def unlock_targets(job_config):

:param job_config: dict, job config data
"""
machine_statuses = query.get_statuses(job_config['targets'].keys())
machine_statuses = teuthology.machines.machine_statuses(
job_config['targets'].keys()
)
locked = []
for status in machine_statuses:
name = shortname(status['name'])
Expand All @@ -269,7 +270,12 @@ def unlock_targets(job_config):
return
if job_config.get("unlock_on_failure", True):
log.info('Unlocking machines...')
lock_ops.unlock_safe(locked, job_config["owner"], job_config["name"], job_config["job_id"])
teuthology.machines.unlock_safe(
locked,
job_config["owner"],
job_config["name"],
job_config["job_id"],
)


def run_with_watchdog(process, job_config):
Expand Down
8 changes: 4 additions & 4 deletions teuthology/dispatcher/test/test_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ def build_fake_jobs(self, m_connection, m_job, job_bodies):
@patch("beanstalkc.Job", autospec=True)
@patch("teuthology.repo_utils.fetch_qa_suite")
@patch("teuthology.repo_utils.fetch_teuthology")
@patch("teuthology.dispatcher.beanstalk.watch_tube")
@patch("teuthology.dispatcher.beanstalk.connect")
@patch("teuthology.jobqueue.beanstalk.beanstalk.watch_tube")
@patch("teuthology.jobqueue.beanstalk.beanstalk.connect")
@patch("os.path.isdir", return_value=True)
@patch("teuthology.dispatcher.setup_log_file")
def test_main_loop(
Expand Down Expand Up @@ -141,8 +141,8 @@ def test_main_loop(
@patch("beanstalkc.Job", autospec=True)
@patch("teuthology.repo_utils.fetch_qa_suite")
@patch("teuthology.repo_utils.fetch_teuthology")
@patch("teuthology.dispatcher.beanstalk.watch_tube")
@patch("teuthology.dispatcher.beanstalk.connect")
@patch("teuthology.jobqueue.beanstalk.beanstalk.watch_tube")
@patch("teuthology.jobqueue.beanstalk.beanstalk.connect")
@patch("os.path.isdir", return_value=True)
@patch("teuthology.dispatcher.setup_log_file")
def test_main_loop_13925(
Expand Down
4 changes: 2 additions & 2 deletions teuthology/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

import teuthology.beanstalk as beanstalk
import teuthology.dispatcher
import teuthology.machines
from teuthology.config import config
from teuthology.lock.query import list_locks

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -215,7 +215,7 @@ def _init(self):

def _update(self):
for machine_type in MACHINE_TYPES:
nodes = list_locks(machine_type=machine_type)
nodes = teuthology.machines.machine_list(machine_type=machine_type)
for up, locked in itertools.product([True, False], [True, False]):
self.metric.labels(machine_type=machine_type, up=up, locked=locked).set(
len([n for n in nodes if n["up"] is up and n["locked"] is locked])
Expand Down
Empty file.
42 changes: 42 additions & 0 deletions teuthology/jobqueue/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from typing import Dict, Any, Optional

import abc
import enum


JobSpec = Dict[str, Any]


class QueueDirection(enum.Enum):
IN = 1
OUT = 2
BIDIR = 3


class Job(abc.ABC):
jid: int

@abc.abstractmethod
def bury(self) -> None: ...

@abc.abstractmethod
def delete(self) -> None: ...

@abc.abstractmethod
def job_config(self) -> JobSpec: ...


class JobQueue(abc.ABC):
@abc.abstractmethod
def put(self, job_config: JobSpec) -> int: ...

@abc.abstractmethod
def get(self) -> Optional[Job]: ...

@property
@abc.abstractmethod
def tube(self) -> str: ...

@property
@abc.abstractmethod
def direction(self) -> QueueDirection: ...
68 changes: 68 additions & 0 deletions teuthology/jobqueue/beanstalk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from typing import Optional

import yaml

from teuthology import beanstalk
from teuthology.jobqueue import base


class Job(base.Job):
jid: int

def __init__(self, job) -> None:
self._job = job
self.jid = job.jid

def bury(self) -> None:
self._job.bury()

def delete(self) -> None:
self._job.delete()

def job_config(self) -> base.JobSpec:
return yaml.safe_load(self._job.body)


class JobQueue(base.JobQueue):
def __init__(
self, connection, tube: str, direction: base.QueueDirection
) -> None:
self._connection = connection
self._direction = direction
if direction == base.QueueDirection.IN:
self._connection.use(tube)
self._tube = tube
elif direction == base.QueueDirection.OUT:
self._tube = beanstalk.watch_tube(tube)
else:
raise ValueError(
f'invalid direction for beanstalk job queue: {direction}'
)

def put(self, job_config: base.JobSpec) -> int:
if self._direction != base.QueueDirection.IN:
raise ValueError('not an input queue')
job = yaml.safe_dump(job_config)
jid = beanstalk.put(
job,
ttr=60 * 60 * 24,
priority=job_config['priority'],
)
return jid

def get(self) -> Optional[Job]:
if self._direction != base.QueueDirection.OUT:
raise ValueError('not an output queue')
return Job(self._connection.reserve(timeout=60))

@property
def tube(self) -> str:
return self._tube

@property
def direction(self) -> base.QueueDirection:
return self._direction

@classmethod
def connect(cls, tube: str, direction: base.QueueDirection):
return cls(beanstalk.connect(), tube, direction)
29 changes: 29 additions & 0 deletions teuthology/jobqueue/choice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from teuthology.config import config as teuth_config

from teuthology.jobqueue.base import QueueDirection, JobQueue
from teuthology.jobqueue import beanstalk, sqlite
from teuthology.jobqueue import file as fileq


def from_backend(
backend: str, tube: str, direction: QueueDirection
) -> JobQueue:
if backend in ('beanstalk', ''):
return beanstalk.JobQueue.connect(tube, direction)
if backend.startswith('@'):
return fileq.JobQueue(backend.lstrip('@'), direction)
if backend.startswith('sqlite:'):
return sqlite.JobQueue(backend, tube, direction)
raise ValueError(
f"Unexpected queue backend: {backend!r}"
" (expected 'beanstalk', '@<path-to-file>',"
" or 'sqlite://<path-to-file>'"
)


def from_config(
tube: str, direction: QueueDirection, backend: str = ''
) -> JobQueue:
if not backend:
backend = teuth_config.job_queue_backend or ''
return from_backend(backend, tube, direction)
36 changes: 36 additions & 0 deletions teuthology/jobqueue/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import yaml

from teuthology.jobqueue import base


class JobQueue(base.JobQueue):
def __init__(self, path: str, direction: base.QueueDirection) -> None:
if direction != base.QueueDirection.IN:
raise ValueError('only output supported')
self._base_path = path
self._count_file_path = f'{path}.count'
self._count = 0
try:
with open(self._count_file_path, 'r') as fh:
self._count = int(fh.read() or 0)
except FileNotFoundError:
pass

def put(self, job_config: base.JobSpec) -> int:
jid = self._count = self._count + 1
job_config['job_id'] = str(jid)
job = yaml.safe_dump(job_config)
with open(self._base_path, 'a') as fh:
fh.write('---\n')
fh.write(job)
with open(self._count_file_path, 'w') as fh:
fh.write(str(jid))
print(f'Job scheduled with name {job_config["name"]} and ID {jid}')

@property
def tube(self) -> str:
return ''

@property
def direction(self) -> base.QueueDirection:
return base.QueueDirection.IN
Loading
Loading