From 0465820b47fa623bca8cf891fd19a9359bc21168 Mon Sep 17 00:00:00 2001 From: MrCreosote Date: Sun, 19 Dec 2021 20:36:12 -0800 Subject: [PATCH] DATAUP-679: Add MongoUtil method to update a single job to queued (#429) * Add MongoUtil method to update a single job to queued * run black --- lib/execution_engine2/db/MongoUtil.py | 36 ++++++++++++++++++++++++ test/tests_for_db/ee2_MongoUtil_test.py | 37 +++++++++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/lib/execution_engine2/db/MongoUtil.py b/lib/execution_engine2/db/MongoUtil.py index 88527d029..c7feb0f6a 100644 --- a/lib/execution_engine2/db/MongoUtil.py +++ b/lib/execution_engine2/db/MongoUtil.py @@ -271,6 +271,42 @@ def check_if_already_finished(job_status): return True return False + def update_job_to_queued( + self, job_id: str, scheduler_id: str, scheduler_type: str = "condor" + ) -> None: + f""" + * Updates a {Status.created.value} job to queued and sets scheduler state. + Always sets scheduler state, but will only update to queued if the job is in the + {Status.created.value} state. + :param job_id: the ID of the job. + :param scheduler_id: the scheduler's job ID for the job. + :param scheduler_type: The scheduler this job was queued in, default condor + """ + if not job_id or not scheduler_id or not scheduler_type: + raise ValueError("None of the 3 arguments can be falsy") + # could also test that the job ID is a valid job ID rather than having mongo throw an + # error + mongo_collection = self.config["mongo-jobs-collection"] + queue_time_now = time.time() + with self.pymongo_client(mongo_collection) as pymongo_client: + ee2_jobs_col = pymongo_client[self.mongo_database][mongo_collection] + # should we check that the job was updated and do something if it wasn't? + ee2_jobs_col.update_one( + {"_id": ObjectId(job_id), "status": Status.created.value}, + {"$set": {"status": Status.queued.value, "queued": queue_time_now}}, + ) + # originally had a single query, but seems safer to always record the scheduler + # state no matter the state of the job + ee2_jobs_col.update_one( + {"_id": ObjectId(job_id)}, + { + "$set": { + "scheduler_id": scheduler_id, + "scheduler_type": scheduler_type, + } + }, + ) + def update_jobs_to_queued( self, job_id_pairs: List[JobIdPair], scheduler_type: str = "condor" ) -> None: diff --git a/test/tests_for_db/ee2_MongoUtil_test.py b/test/tests_for_db/ee2_MongoUtil_test.py index c13388a8e..2f6a819c5 100644 --- a/test/tests_for_db/ee2_MongoUtil_test.py +++ b/test/tests_for_db/ee2_MongoUtil_test.py @@ -3,6 +3,7 @@ import os import time import unittest +from pytest import raises from bson.objectid import ObjectId @@ -13,6 +14,8 @@ bootstrap, get_example_job, read_config_into_dict, + assert_exception_correct, + assert_close_to_now, ) from tests_for_db.mongo_test_helper import MongoTestHelper @@ -70,6 +73,40 @@ def test_insert_jobs(self): for i, retrieved_job in enumerate(retrieved_jobs): assert jobs_to_insert[i].to_json() == retrieved_job.to_json() + def test_update_job_to_queued_fail_with_bad_args(self): + jid = "aaaaaaaaaaaaaaaaaaaaaaaa" + err = ValueError("None of the 3 arguments can be falsy") + self.update_job_to_queued_fail(None, "sid", "sch", err) + self.update_job_to_queued_fail("", "sid", "sch", err) + self.update_job_to_queued_fail(jid, None, "sch", err) + self.update_job_to_queued_fail(jid, "", "sch", err) + self.update_job_to_queued_fail(jid, "sid", None, err) + self.update_job_to_queued_fail(jid, "sid", "", err) + + def update_job_to_queued_fail(self, job_id, schd_id, schd, expected): + with raises(Exception) as got: + self.getMongoUtil().update_job_to_queued(job_id, schd_id, schd) + assert_exception_correct(got.value, expected) + + def test_update_job_to_queued(self): + for state in Status: + j = get_example_job(status=state.value) + j.scheduler_id = None + j.save() + assert j.scheduler_id is None + + self.getMongoUtil().update_job_to_queued(j.id, "schdID", "condenast") + j.reload() + assert_close_to_now(j.updated) + assert j.scheduler_id == "schdID" + assert j.scheduler_type == "condenast" + if state == Status.created: + assert_close_to_now(j.queued) + assert j.status == Status.queued.value + else: + assert j.queued is None + assert j.status == state.value + def test_update_jobs_enmasse(self): """Check to see that created jobs get updated to queued""" for state in Status: