From 52e9ebf9cc35a736575f21087454a63f2e164f8c Mon Sep 17 00:00:00 2001 From: Zack Cerza Date: Mon, 1 Jul 2024 17:00:46 -0600 Subject: [PATCH] Add job expiration dates This feature has two parts: * Specifying expiration dates when scheduling test runs * A global maximum age Expiration dates are provided by passing `--expire` to `teuthology-suite` with a relative value like `1d` (one day), `1w` (one week), or an absolute value like `1999-12-31_23:59:59`. A new configuration item, `max_job_age`, is specified in seconds. This defaults to two weeks. When the dispatcher checks the queue for the next job to run, it will first compare the job's `timestamp` value - which reflects the time the job was scheduled. If more than `max_job_age` seconds have passed, the job is skipped and marked dead. It next checks for an `expire` value; if that value is in the past, the job is skipped and marked dead. Otherwise, it will be run as usual. Signed-off-by: Zack Cerza --- docs/siteconfig.rst | 4 +++ scripts/suite.py | 2 ++ teuthology/config.py | 1 + teuthology/dispatcher/__init__.py | 28 +++++++++++++++ teuthology/dispatcher/test/test_dispatcher.py | 28 +++++++++++++++ teuthology/suite/__init__.py | 3 ++ teuthology/suite/run.py | 34 ++++++++++++++++-- teuthology/suite/test/test_run_.py | 35 ++++++++++++++++++- 8 files changed, 132 insertions(+), 3 deletions(-) diff --git a/docs/siteconfig.rst b/docs/siteconfig.rst index 83c5f1599..effb8219c 100644 --- a/docs/siteconfig.rst +++ b/docs/siteconfig.rst @@ -85,6 +85,10 @@ Here is a sample configuration with many of the options set and documented:: # processes watchdog_interval: 120 + # How old a scheduled job can be, in seconds, before the dispatcher + # considers it 'expired', skipping it. + max_job_age: 1209600 + # How long a scheduled job should be allowed to run, in seconds, before # it is killed by the supervisor process. max_job_time: 259200 diff --git a/scripts/suite.py b/scripts/suite.py index 77561b7e0..b936860be 100644 --- a/scripts/suite.py +++ b/scripts/suite.py @@ -112,6 +112,8 @@ When tests finish or time out, send an email here. May also be specified in ~/.teuthology.yaml as 'results_email' + --expire Do not execute jobs in the run if they have not + completed by this time --rocketchat Comma separated list of Rocket.Chat channels where to send a message when tests finished or time out. To be used with --sleep-before-teardown option. diff --git a/teuthology/config.py b/teuthology/config.py index 3983d3d0f..d3b3ae52d 100644 --- a/teuthology/config.py +++ b/teuthology/config.py @@ -157,6 +157,7 @@ class TeuthologyConfig(YamlConfig): 'job_threshold': 500, 'lab_domain': 'front.sepia.ceph.com', 'lock_server': 'http://paddles.front.sepia.ceph.com/', + 'max_job_age': 1209600, # 2 weeks 'max_job_time': 259200, # 3 days 'nsupdate_url': 'http://nsupdate.front.sepia.ceph.com/update', 'results_server': 'http://paddles.front.sepia.ceph.com/', diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index 7cbaf7449..b905bc2d7 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -22,6 +22,7 @@ from teuthology.dispatcher import supervisor from teuthology.exceptions import BranchNotFoundError, CommitNotFoundError, SkipJob, MaxWhileTries from teuthology.lock import ops as lock_ops +from teuthology.util.time import parse_timestamp from teuthology import safepath log = logging.getLogger(__name__) @@ -232,6 +233,8 @@ def match(proc): def prep_job(job_config, log_file_path, archive_dir): job_id = job_config['job_id'] + check_job_expiration(job_config) + safe_archive = safepath.munge(job_config['name']) job_config['worker_log'] = log_file_path archive_path_full = os.path.join( @@ -306,6 +309,31 @@ def prep_job(job_config, log_file_path, archive_dir): return job_config, teuth_bin_path +def check_job_expiration(job_config): + job_id = job_config['job_id'] + expired = False + now = datetime.datetime.now(datetime.timezone.utc) + if expire_str := job_config.get('timestamp'): + expire = parse_timestamp(expire_str) + \ + datetime.timedelta(seconds=teuth_config.max_job_age) + expired = expire < now + if not expired and (expire_str := job_config.get('expire')): + try: + expire = parse_timestamp(expire_str) + expired = expired or expire < now + except ValueError: + log.warning(f"Failed to parse job expiration: {expire_str=}") + pass + if expired: + log.info(f"Skipping job {job_id} because it is expired: {expire_str} is in the past") + report.try_push_job_info( + job_config, + # TODO: Add a 'canceled' status to paddles, and use that. + dict(status='dead'), + ) + raise SkipJob() + + def lock_machines(job_config): report.try_push_job_info(job_config, dict(status='running')) fake_ctx = supervisor.create_fake_context(job_config, block=True) diff --git a/teuthology/dispatcher/test/test_dispatcher.py b/teuthology/dispatcher/test/test_dispatcher.py index e7c59d8bd..8551455e2 100644 --- a/teuthology/dispatcher/test/test_dispatcher.py +++ b/teuthology/dispatcher/test/test_dispatcher.py @@ -7,6 +7,7 @@ from teuthology import dispatcher from teuthology.config import FakeNamespace from teuthology.contextutil import MaxWhileTries +from teuthology.util.time import TIMESTAMP_FMT class TestDispatcher(object): @@ -172,3 +173,30 @@ def test_main_loop_13925( for i in range(len(jobs)): push_call = m_try_push_job_info.call_args_list[i] assert push_call[0][1]['status'] == 'dead' + + @pytest.mark.parametrize( + ["timestamp", "expire", "skip"], + [ + [datetime.timedelta(days=-1), None, False], + [datetime.timedelta(days=-30), None, True], + [None, datetime.timedelta(days=1), False], + [None, datetime.timedelta(days=-1), True], + [datetime.timedelta(days=-1), datetime.timedelta(days=1), False], + [datetime.timedelta(days=1), datetime.timedelta(days=-1), True], + ] + ) + def test_check_job_expiration(self, timestamp, expire, skip): + now = datetime.datetime.now(datetime.timezone.utc) + job_config = dict( + job_id="1", + name="job_name", + ) + if timestamp: + job_config["timestamp"] = (now + timestamp).strftime(TIMESTAMP_FMT) + if expire: + job_config["expire"] = (now + expire).strftime(TIMESTAMP_FMT) + if skip: + with pytest.raises(dispatcher.SkipJob): + dispatcher.check_job_expiration(job_config) + else: + dispatcher.check_job_expiration(job_config) diff --git a/teuthology/suite/__init__.py b/teuthology/suite/__init__.py index 6fc167fab..8a17cf5f1 100644 --- a/teuthology/suite/__init__.py +++ b/teuthology/suite/__init__.py @@ -63,6 +63,9 @@ def process_args(args): elif key == 'subset' and value is not None: # take input string '2/3' and turn into (2, 3) value = tuple(map(int, value.split('/'))) + elif key == 'expire' and value is None: + # Skip empty 'expire' values + continue elif key in ('filter_all', 'filter_in', 'filter_out', 'rerun_statuses'): if not value: value = [] diff --git a/teuthology/suite/run.py b/teuthology/suite/run.py index 600f1568f..d425245b4 100644 --- a/teuthology/suite/run.py +++ b/teuthology/suite/run.py @@ -1,4 +1,5 @@ import copy +import datetime import logging import os import pwd @@ -8,7 +9,6 @@ from humanfriendly import format_timespan -from datetime import datetime from tempfile import NamedTemporaryFile from teuthology import repo_utils @@ -24,7 +24,7 @@ from teuthology.suite.merge import config_merge from teuthology.suite.build_matrix import build_matrix from teuthology.suite.placeholder import substitute_placeholders, dict_templ -from teuthology.util.time import TIMESTAMP_FMT +from teuthology.util.time import parse_offset, parse_timestamp, TIMESTAMP_FMT log = logging.getLogger(__name__) @@ -87,6 +87,15 @@ def create_initial_config(self): :returns: A JobConfig object """ + now = datetime.datetime.now(datetime.UTC) + expires = self.get_expiration() + if expires: + if now > expires: + util.schedule_fail( + f"Refusing to schedule because the expiration date is in the past: {self.args.expire}", + dry_run=self.args.dry_run, + ) + self.os = self.choose_os() self.kernel_dict = self.choose_kernel() ceph_hash = self.choose_ceph_hash() @@ -124,8 +133,29 @@ def create_initial_config(self): suite_relpath=self.args.suite_relpath, flavor=self.args.flavor, ) + if expires: + self.config_input["expires"] = expires.strftime(TIMESTAMP_FMT) return self.build_base_config() + def get_expiration(self, _base_time: datetime.datetime | None = None) -> datetime.datetime | None: + """ + _base_time: For testing, calculate relative offsets from this base time + + :returns: True if the job should run; False if it has expired + """ + log.info(f"Checking for expiration ({self.args.expire})") + expires_str = self.args.expire + if expires_str is None: + return None + now = datetime.datetime.now(datetime.UTC) + if _base_time is None: + _base_time = now + try: + expires = parse_timestamp(expires_str) + except ValueError: + expires = _base_time + parse_offset(expires_str) + return expires + def choose_os(self): os_type = self.args.distro os_version = self.args.distro_version diff --git a/teuthology/suite/test/test_run_.py b/teuthology/suite/test/test_run_.py index abe78a386..9395c4fc3 100644 --- a/teuthology/suite/test/test_run_.py +++ b/teuthology/suite/test/test_run_.py @@ -4,7 +4,7 @@ import contextlib import yaml -from datetime import datetime +from datetime import datetime, timedelta, UTC from mock import patch, call, ANY from io import StringIO from io import BytesIO @@ -90,6 +90,39 @@ def test_branch_nonexistent( with pytest.raises(ScheduleFailError): self.klass(self.args) + @pytest.mark.parametrize( + ["expire", "delta", "result"], + [ + ["1m", timedelta(), True], + ["1m", timedelta(minutes=-2), False], + ["1m", timedelta(minutes=2), True], + ["7d", timedelta(days=-14), False], + ] + ) + @patch('teuthology.repo_utils.fetch_repo') + @patch('teuthology.suite.run.util.git_branch_exists') + @patch('teuthology.suite.run.util.package_version_for_hash') + @patch('teuthology.suite.run.util.git_ls_remote') + def test_expiration( + self, + m_git_ls_remote, + m_package_version_for_hash, + m_git_branch_exists, + m_fetch_repo, + expire, + delta, + result, + ): + m_git_ls_remote.side_effect = 'hash' + m_package_version_for_hash.return_value = 'a_version' + m_git_branch_exists.return_value = True + self.args.expire = expire + obj = self.klass(self.args) + now = datetime.now(UTC) + expires_result = obj.get_expiration(_base_time=datetime.now(UTC) + delta) + assert expires_result is not None + assert (now < expires_result) is result + @patch('teuthology.suite.run.util.fetch_repos') @patch('requests.head') @patch('teuthology.suite.run.util.git_branch_exists')