Skip to content

Commit

Permalink
teuthology/queue: Single command for queue operations
Browse files Browse the repository at this point in the history
Makes the same teuthology-queue commands work regardless of the queue backend, Paddles or Beanstalk.

Signed-off-by: Aishwarya Mathuria <[email protected]>
  • Loading branch information
amathuria committed Oct 9, 2024
1 parent 74bd369 commit be34af2
Show file tree
Hide file tree
Showing 13 changed files with 165 additions and 211 deletions.
1 change: 0 additions & 1 deletion scripts/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
--queue-backend BACKEND choose between paddles and beanstalk
"""

import docopt
import sys

import teuthology.dispatcher.supervisor
Expand Down
45 changes: 0 additions & 45 deletions scripts/paddles_queue.py

This file was deleted.

15 changes: 12 additions & 3 deletions scripts/queue.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import docopt

import teuthology.config
import teuthology.queue.beanstalk
import teuthology.queue.paddles
from teuthology.config import config

doc = """
usage: teuthology-queue -h
teuthology-queue [-s|-d|-f] -m MACHINE_TYPE
teuthology-queue [-r] -m MACHINE_TYPE
teuthology-queue -m MACHINE_TYPE -D PATTERN
teuthology-queue -p SECONDS [-m MACHINE_TYPE]
teuthology-queue -p SECONDS [-m MACHINE_TYPE] [-U USER]
teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME]
List Jobs in queue.
If -D is passed, then jobs with PATTERN in the job name are deleted from the
Expand All @@ -29,9 +30,17 @@
-p, --pause SECONDS Pause queues for a number of seconds. A value of 0
will unpause. If -m is passed, pause that queue,
otherwise pause all queues.
-P, --priority PRIORITY
Change priority of queued jobs (only in Paddles queues)
-U, --user USER User who owns the jobs
-R, --run-name RUN_NAME
Used to change priority of all jobs in the run.
"""


def main():
args = docopt.docopt(doc)
teuthology.queue.main(args)
if config.backend == 'beanstalk':
teuthology.queue.beanstalk.main(args)
else:
teuthology.queue.paddles.main(args)
7 changes: 5 additions & 2 deletions teuthology/dispatcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@ def main(args):
"There is already a teuthology-dispatcher process running:"
f" {procs}"
)
verbose = args["--verbose"]
machine_type = args["--machine-type"]
log_dir = args["--log-dir"]
archive_dir = args["--archive-dir"]
exit_on_empty_queue = args["--exit-on-empty-queue"]
backend = args['--queue-backend']
Expand All @@ -111,6 +109,8 @@ def main(args):
if backend == 'beanstalk':
connection = beanstalk.connect()
beanstalk.watch_tube(connection, machine_type)
elif backend == 'paddles':
report.create_machine_type_queue(machine_type)

result_proc = None

Expand Down Expand Up @@ -152,6 +152,9 @@ def main(args):
else:
job = report.get_queued_job(machine_type)
if job is None:
if exit_on_empty_queue and not job_procs:
log.info("Queue is empty and no supervisor processes running; exiting!")
break
continue
job = clean_config(job)
report.try_push_job_info(job, dict(status='running'))
Expand Down
7 changes: 3 additions & 4 deletions teuthology/dispatcher/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose):
if teuth_config.results_server:
try:
report.try_delete_jobs(job_config['name'], job_config['job_id'])
except Exception as e:
log.warning("Unable to delete job %s, exception occurred: %s",
job_config['job_id'], e)
except Exception:
log.exception("Unable to delete job %s", job_config['job_id'])
job_archive = os.path.join(archive_dir, safe_archive)
args = [
os.path.join(teuth_bin_path, 'teuthology-results'),
Expand Down Expand Up @@ -142,7 +141,7 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose):
'--archive', job_config['archive_path'],
'--name', job_config['name'],
])
if 'description' in job_config:
if job_config.get('description') is not None:
arg.extend(['--description', job_config['description']])
job_archive = os.path.join(job_config['archive_path'], 'orig.config.yaml')
arg.extend(['--', job_archive])
Expand Down
2 changes: 0 additions & 2 deletions teuthology/kill.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
from teuthology.queue import beanstalk
from teuthology import report
from teuthology.lock import ops as lock_ops
from teuthology import beanstalk
from teuthology.config import config
from teuthology import misc

log = logging.getLogger(__name__)

Expand Down
106 changes: 0 additions & 106 deletions teuthology/queue/__init__.py
Original file line number Diff line number Diff line change
@@ -1,106 +0,0 @@
import logging
import pprint
import sys
from collections import OrderedDict

from teuthology import report
from teuthology.config import config

log = logging.getLogger(__name__)

def print_progress(index, total, message=None):
msg = "{m} ".format(m=message) if message else ''
sys.stderr.write("{msg}{i}/{total}\r".format(
msg=msg, i=index, total=total))
sys.stderr.flush()

def end_progress():
sys.stderr.write('\n')
sys.stderr.flush()

class JobProcessor(object):
def __init__(self):
self.jobs = OrderedDict()

def add_job(self, job_id, job_config, job_obj=None):
job_id = str(job_id)

job_dict = dict(
index=(len(self.jobs) + 1),
job_config=job_config,
)
if job_obj:
job_dict['job_obj'] = job_obj
self.jobs[job_id] = job_dict

self.process_job(job_id)

def process_job(self, job_id):
pass

def complete(self):
pass


class JobPrinter(JobProcessor):
def __init__(self, show_desc=False, full=False):
super(JobPrinter, self).__init__()
self.show_desc = show_desc
self.full = full

def process_job(self, job_id):
job_config = self.jobs[job_id]['job_config']
job_index = self.jobs[job_id]['index']
job_priority = job_config['priority']
job_name = job_config['name']
job_desc = job_config['description']
print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
i=job_index,
pri=job_priority,
job_id=job_id,
job_name=job_name,
))
if self.full:
pprint.pprint(job_config)
elif job_desc and self.show_desc:
for desc in job_desc.split():
print('\t {}'.format(desc))


class RunPrinter(JobProcessor):
def __init__(self):
super(RunPrinter, self).__init__()
self.runs = list()

def process_job(self, job_id):
run = self.jobs[job_id]['job_config']['name']
if run not in self.runs:
self.runs.append(run)
print(run)


class JobDeleter(JobProcessor):
def __init__(self, pattern):
self.pattern = pattern
super(JobDeleter, self).__init__()

def add_job(self, job_id, job_config, job_obj=None):
job_name = job_config['name']
if self.pattern in job_name:
super(JobDeleter, self).add_job(job_id, job_config, job_obj)

def process_job(self, job_id):
job_config = self.jobs[job_id]['job_config']
job_name = job_config['name']
print('Deleting {job_name}/{job_id}'.format(
job_id=job_id,
job_name=job_name,
))
report.try_delete_jobs(job_name, job_id)


def main(args):
if config.backend == 'paddles':
paddles.main(args)
else:
beanstalk.main(args)
16 changes: 7 additions & 9 deletions teuthology/queue/beanstalk.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import beanstalkc
import yaml
import logging
import pprint
import sys
from collections import OrderedDict

from teuthology.config import config
from teuthology import report
from teuthology.queue import util


log = logging.getLogger(__name__)

Expand Down Expand Up @@ -47,7 +45,7 @@ def callback(jobs_dict)
# Try to figure out a sane timeout based on how many jobs are in the queue
timeout = job_count / 2000.0 * 60
for i in range(1, job_count + 1):
print_progress(i, job_count, "Loading")
util.print_progress(i, job_count, "Loading")
job = connection.reserve(timeout=timeout)
if job is None or job.body is None:
continue
Expand All @@ -57,7 +55,7 @@ def callback(jobs_dict)
if pattern is not None and pattern not in job_name:
continue
processor.add_job(job_id, job_config, job)
end_progress()
util.end_progress()
processor.complete()


Expand Down Expand Up @@ -105,13 +103,13 @@ def main(args):
pause_tube(connection, machine_type, pause_duration)
elif delete:
walk_jobs(connection, machine_type,
JobDeleter(delete))
util.JobDeleter(delete))
elif runs:
walk_jobs(connection, machine_type,
RunPrinter())
util.RunPrinter())
else:
walk_jobs(connection, machine_type,
JobPrinter(show_desc=show_desc, full=full))
util.JobPrinter(show_desc=show_desc, full=full))
except KeyboardInterrupt:
log.info("Interrupted.")
finally:
Expand Down
Loading

0 comments on commit be34af2

Please sign in to comment.