Skip to content

Commit

Permalink
DATAUP-679: Update job to queued directly after submit (#430)
Browse files Browse the repository at this point in the history
* Update job to queued directly after submit

If many jobs are submitted in a batch, the jobs can start running before
being updated to queued, which means the jobs never get a queued timestamp.

* run black

* Bump version, add release notes
  • Loading branch information
MrCreosote authored Dec 20, 2021
1 parent 0465820 commit 18408b2
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 31 deletions.
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# execution_engine2 (ee2) release notes
=========================================

## 0.0.7
* Fixed a bug that could cause missing `queued` timestamps if many jobs were submitted in a
batch

## 0.0.6
* Release of MVP

Expand Down
2 changes: 1 addition & 1 deletion kbase.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ service-language:
python

module-version:
0.0.5
0.0.7

owners:
[bsadkhin, tgu2, wjriehl, gaprice]
Expand Down
8 changes: 6 additions & 2 deletions lib/execution_engine2/db/MongoUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
import traceback
from contextlib import contextmanager
from typing import Dict, List
from typing import Dict, List, NamedTuple
from bson.objectid import ObjectId
from mongoengine import connect, connection
from pymongo import MongoClient, UpdateOne
Expand All @@ -16,7 +16,11 @@
)

from lib.execution_engine2.utils.arg_processing import parse_bool
from execution_engine2.sdk.EE2Runjob import JobIdPair


class JobIdPair(NamedTuple):
job_id: str
scheduler_id: str


class MongoUtil:
Expand Down
2 changes: 1 addition & 1 deletion lib/execution_engine2/execution_engine2Impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class execution_engine2:
# state. A method could easily clobber the state set by another while
# the latter method is running.
######################################### noqa
VERSION = "0.0.5"
VERSION = "0.0.7"
GIT_URL = "https://github.com/mrcreosote/execution_engine2.git"
GIT_COMMIT_HASH = "2ad95ce47caa4f1e7b939651f2b1773840e67a8a"

Expand Down
32 changes: 14 additions & 18 deletions lib/execution_engine2/sdk/EE2Runjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@ class PreparedJobParams(NamedTuple):
job_id: str


class JobIdPair(NamedTuple):
job_id: str
scheduler_id: str


from typing import TYPE_CHECKING

if TYPE_CHECKING:
Expand Down Expand Up @@ -312,17 +307,11 @@ def _run_multiple(self, runjob_params: List[Dict]):
).start()
return job_ids

def _update_to_queued_multiple(self, job_ids, scheduler_ids):
def _finish_multiple_job_submission(self, job_ids):
"""
This is called during job submission. If a job is terminated during job submission,
we have the chance to re-issue a termination and remove the job from the Job Queue
"""
if len(job_ids) != len(scheduler_ids):
raise Exception(
"Need to provide the same amount of job ids and scheduler_ids"
)
jobs_to_update = list(map(JobIdPair, job_ids, scheduler_ids))
self.sdkmr.get_mongo_util().update_jobs_to_queued(jobs_to_update)
jobs = self.sdkmr.get_mongo_util().get_jobs(job_ids)

for job in jobs:
Expand Down Expand Up @@ -377,14 +366,21 @@ def _submit_multiple(self, job_submission_params):
)
raise RuntimeError(error_msg)
condor_job_ids.append(condor_job_id)

self.logger.error(f"It took {time.time() - begin} to submit jobs to condor")
# It took 4.836009502410889 to submit jobs to condor
# Previously the jobs were updated in a batch after submitting all jobs to condor.
# This led to issues where a large job count could result in jobs switching to
# running prior to all jobs being submitted and so the queued timestamp was
# never added to the job record.
self.sdkmr.get_mongo_util().update_job_to_queued(job_id, condor_job_id)

self.logger.error(
f"It took {time.time() - begin} to submit jobs to condor and update to queued"
)

update_time = time.time()
self._update_to_queued_multiple(job_ids=job_ids, scheduler_ids=condor_job_ids)
# It took 1.9239885807037354 to update jobs
self.logger.error(f"It took {time.time() - update_time} to update jobs ")
self._finish_multiple_job_submission(job_ids=job_ids)
self.logger.error(
f"It took {time.time() - update_time} to finish job submission"
)

return job_ids

Expand Down
3 changes: 1 addition & 2 deletions test/tests_for_db/ee2_MongoUtil_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@

from bson.objectid import ObjectId

from execution_engine2.db.MongoUtil import MongoUtil
from execution_engine2.db.MongoUtil import MongoUtil, JobIdPair
from execution_engine2.db.models.models import Job, JobLog, Status
from execution_engine2.sdk.EE2Runjob import JobIdPair
from test.utils_shared.test_utils import (
bootstrap,
get_example_job,
Expand Down
15 changes: 8 additions & 7 deletions test/tests_for_sdkmr/EE2Runjob_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
AuthError,
InvalidParameterForBatch,
)
from execution_engine2.sdk.EE2Runjob import EE2RunJob, JobPermissions, JobIdPair
from execution_engine2.sdk.EE2Runjob import EE2RunJob, JobPermissions
from execution_engine2.sdk.SDKMethodRunner import SDKMethodRunner
from execution_engine2.sdk.job_submission_parameters import (
JobSubmissionParameters,
Expand Down Expand Up @@ -908,12 +908,13 @@ def _check_common_mock_calls_batch(
)

# update to queued state
child_job_pairs = [
JobIdPair(_JOB_ID_1, _CLUSTER_1),
JobIdPair(_JOB_ID_2, _CLUSTER_2),
]
mocks[MongoUtil].update_jobs_to_queued.assert_has_calls([call(child_job_pairs)])
job_ids = [child_job_pair.job_id for child_job_pair in child_job_pairs]
mocks[MongoUtil].update_job_to_queued.assert_has_calls(
[
call(_JOB_ID_1, _CLUSTER_1),
call(_JOB_ID_2, _CLUSTER_2),
]
)
job_ids = [_JOB_ID_1, _JOB_ID_2]
mocks[MongoUtil].get_jobs.assert_has_calls([call(job_ids)])

if not terminated_during_submit:
Expand Down

0 comments on commit 18408b2

Please sign in to comment.