From 18408b25510383b44025c9dde9cf4d4813977f52 Mon Sep 17 00:00:00 2001 From: MrCreosote Date: Sun, 19 Dec 2021 21:17:47 -0800 Subject: [PATCH] DATAUP-679: Update job to queued directly after submit (#430) * Update job to queued directly after submit If many jobs are submitted in a batch, the jobs can start running before being updated to queued, which means the jobs never get a queued timestamp. * run black * Bump version, add release notes --- RELEASE_NOTES.md | 4 +++ kbase.yml | 2 +- lib/execution_engine2/db/MongoUtil.py | 8 +++-- .../execution_engine2Impl.py | 2 +- lib/execution_engine2/sdk/EE2Runjob.py | 32 ++++++++----------- test/tests_for_db/ee2_MongoUtil_test.py | 3 +- test/tests_for_sdkmr/EE2Runjob_test.py | 15 +++++---- 7 files changed, 35 insertions(+), 31 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 68376cfdd..750cb8cb9 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,6 +1,10 @@ # execution_engine2 (ee2) release notes ========================================= +## 0.0.7 +* Fixed a bug that could cause missing `queued` timestamps if many jobs were submitted in a + batch + ## 0.0.6 * Release of MVP diff --git a/kbase.yml b/kbase.yml index 0cee4a309..15f81f845 100644 --- a/kbase.yml +++ b/kbase.yml @@ -8,7 +8,7 @@ service-language: python module-version: - 0.0.5 + 0.0.7 owners: [bsadkhin, tgu2, wjriehl, gaprice] diff --git a/lib/execution_engine2/db/MongoUtil.py b/lib/execution_engine2/db/MongoUtil.py index c7feb0f6a..f71d53cc1 100644 --- a/lib/execution_engine2/db/MongoUtil.py +++ b/lib/execution_engine2/db/MongoUtil.py @@ -3,7 +3,7 @@ import time import traceback from contextlib import contextmanager -from typing import Dict, List +from typing import Dict, List, NamedTuple from bson.objectid import ObjectId from mongoengine import connect, connection from pymongo import MongoClient, UpdateOne @@ -16,7 +16,11 @@ ) from lib.execution_engine2.utils.arg_processing import parse_bool -from execution_engine2.sdk.EE2Runjob import JobIdPair + + +class JobIdPair(NamedTuple): + job_id: str + scheduler_id: str class MongoUtil: diff --git a/lib/execution_engine2/execution_engine2Impl.py b/lib/execution_engine2/execution_engine2Impl.py index 5b6366de5..2be7e6813 100644 --- a/lib/execution_engine2/execution_engine2Impl.py +++ b/lib/execution_engine2/execution_engine2Impl.py @@ -28,7 +28,7 @@ class execution_engine2: # state. A method could easily clobber the state set by another while # the latter method is running. ######################################### noqa - VERSION = "0.0.5" + VERSION = "0.0.7" GIT_URL = "https://github.com/mrcreosote/execution_engine2.git" GIT_COMMIT_HASH = "2ad95ce47caa4f1e7b939651f2b1773840e67a8a" diff --git a/lib/execution_engine2/sdk/EE2Runjob.py b/lib/execution_engine2/sdk/EE2Runjob.py index ec6d3952c..2beed9a7a 100644 --- a/lib/execution_engine2/sdk/EE2Runjob.py +++ b/lib/execution_engine2/sdk/EE2Runjob.py @@ -75,11 +75,6 @@ class PreparedJobParams(NamedTuple): job_id: str -class JobIdPair(NamedTuple): - job_id: str - scheduler_id: str - - from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -312,17 +307,11 @@ def _run_multiple(self, runjob_params: List[Dict]): ).start() return job_ids - def _update_to_queued_multiple(self, job_ids, scheduler_ids): + def _finish_multiple_job_submission(self, job_ids): """ This is called during job submission. If a job is terminated during job submission, we have the chance to re-issue a termination and remove the job from the Job Queue """ - if len(job_ids) != len(scheduler_ids): - raise Exception( - "Need to provide the same amount of job ids and scheduler_ids" - ) - jobs_to_update = list(map(JobIdPair, job_ids, scheduler_ids)) - self.sdkmr.get_mongo_util().update_jobs_to_queued(jobs_to_update) jobs = self.sdkmr.get_mongo_util().get_jobs(job_ids) for job in jobs: @@ -377,14 +366,21 @@ def _submit_multiple(self, job_submission_params): ) raise RuntimeError(error_msg) condor_job_ids.append(condor_job_id) - - self.logger.error(f"It took {time.time() - begin} to submit jobs to condor") - # It took 4.836009502410889 to submit jobs to condor + # Previously the jobs were updated in a batch after submitting all jobs to condor. + # This led to issues where a large job count could result in jobs switching to + # running prior to all jobs being submitted and so the queued timestamp was + # never added to the job record. + self.sdkmr.get_mongo_util().update_job_to_queued(job_id, condor_job_id) + + self.logger.error( + f"It took {time.time() - begin} to submit jobs to condor and update to queued" + ) update_time = time.time() - self._update_to_queued_multiple(job_ids=job_ids, scheduler_ids=condor_job_ids) - # It took 1.9239885807037354 to update jobs - self.logger.error(f"It took {time.time() - update_time} to update jobs ") + self._finish_multiple_job_submission(job_ids=job_ids) + self.logger.error( + f"It took {time.time() - update_time} to finish job submission" + ) return job_ids diff --git a/test/tests_for_db/ee2_MongoUtil_test.py b/test/tests_for_db/ee2_MongoUtil_test.py index 2f6a819c5..32933e99b 100644 --- a/test/tests_for_db/ee2_MongoUtil_test.py +++ b/test/tests_for_db/ee2_MongoUtil_test.py @@ -7,9 +7,8 @@ from bson.objectid import ObjectId -from execution_engine2.db.MongoUtil import MongoUtil +from execution_engine2.db.MongoUtil import MongoUtil, JobIdPair from execution_engine2.db.models.models import Job, JobLog, Status -from execution_engine2.sdk.EE2Runjob import JobIdPair from test.utils_shared.test_utils import ( bootstrap, get_example_job, diff --git a/test/tests_for_sdkmr/EE2Runjob_test.py b/test/tests_for_sdkmr/EE2Runjob_test.py index 6dec768bf..5d2c42f85 100644 --- a/test/tests_for_sdkmr/EE2Runjob_test.py +++ b/test/tests_for_sdkmr/EE2Runjob_test.py @@ -27,7 +27,7 @@ AuthError, InvalidParameterForBatch, ) -from execution_engine2.sdk.EE2Runjob import EE2RunJob, JobPermissions, JobIdPair +from execution_engine2.sdk.EE2Runjob import EE2RunJob, JobPermissions from execution_engine2.sdk.SDKMethodRunner import SDKMethodRunner from execution_engine2.sdk.job_submission_parameters import ( JobSubmissionParameters, @@ -908,12 +908,13 @@ def _check_common_mock_calls_batch( ) # update to queued state - child_job_pairs = [ - JobIdPair(_JOB_ID_1, _CLUSTER_1), - JobIdPair(_JOB_ID_2, _CLUSTER_2), - ] - mocks[MongoUtil].update_jobs_to_queued.assert_has_calls([call(child_job_pairs)]) - job_ids = [child_job_pair.job_id for child_job_pair in child_job_pairs] + mocks[MongoUtil].update_job_to_queued.assert_has_calls( + [ + call(_JOB_ID_1, _CLUSTER_1), + call(_JOB_ID_2, _CLUSTER_2), + ] + ) + job_ids = [_JOB_ID_1, _JOB_ID_2] mocks[MongoUtil].get_jobs.assert_has_calls([call(job_ids)]) if not terminated_during_submit: