diff --git a/Dockerfile b/Dockerfile index dfba2f2b5..4939fde50 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,6 +23,10 @@ RUN DEBIAN_FRONTEND=noninteractive wget -qO - https://research.cs.wisc.edu/htcon RUN cd /opt \ && git clone https://github.com/kbase/jars \ && cd - + +# Remove due to cve-2021-4104 issue in spin (log4j) +RUN rm /opt/jars/lib/jars/dockerjava/docker-java-shaded-3.0.14.jar + # install mongodb RUN apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2930ADAE8CAF5059EE73BB4B58712A2291FA4AD5 \ diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 750cb8cb9..62948402e 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,6 +1,11 @@ # execution_engine2 (ee2) release notes ========================================= +## 0.0.8 +* Fixed a bug that could, seemingly rarely, cause job and log updates to be applied to the + wrong Mongo collection. +* Removed docker shaded jar that causes log4j scan to appear positive with Trivy + ## 0.0.7 * Fixed a bug that could cause missing `queued` timestamps if many jobs were submitted in a batch diff --git a/kbase.yml b/kbase.yml index 15f81f845..42615d27b 100644 --- a/kbase.yml +++ b/kbase.yml @@ -8,7 +8,7 @@ service-language: python module-version: - 0.0.7 + 0.0.8 owners: [bsadkhin, tgu2, wjriehl, gaprice] diff --git a/lib/execution_engine2/db/MongoUtil.py b/lib/execution_engine2/db/MongoUtil.py index f71d53cc1..caf77266e 100644 --- a/lib/execution_engine2/db/MongoUtil.py +++ b/lib/execution_engine2/db/MongoUtil.py @@ -6,7 +6,7 @@ from typing import Dict, List, NamedTuple from bson.objectid import ObjectId from mongoengine import connect, connection -from pymongo import MongoClient, UpdateOne +from pymongo import MongoClient from pymongo.errors import ServerSelectionTimeoutError from execution_engine2.db.models.models import JobLog, Job, Status, TerminatedCode @@ -33,7 +33,8 @@ def __init__(self, config: Dict): self.mongo_pass = config["mongo-password"] self.retry_rewrites = parse_bool(config["mongo-retry-rewrites"]) self.mongo_authmechanism = config["mongo-authmechanism"] - self.mongo_collection = None + self._col_jobs = config["mongo-jobs-collection"] + self._col_logs = config["mongo-logs-collection"] self._start_local_service() self.logger = logging.getLogger("ee2") self.pymongoc = self._get_pymongo_client() @@ -155,16 +156,6 @@ def _get_collection( return pymongo_client, mongoengine_client - @contextmanager - def pymongo_client(self, mongo_collection): - """ - Instantiates a mongo client to be used as a context manager - Closes the connection at the end - :return: - """ - self.mongo_collection = mongo_collection - yield self.pymongoc - def get_workspace_jobs(self, workspace_id): with self.mongo_engine_connection(): job_ids = [str(job.id) for job in Job.objects(wsid=workspace_id)] @@ -172,24 +163,21 @@ def get_workspace_jobs(self, workspace_id): def get_job_log_pymongo(self, job_id: str = None): - mongo_collection = self.config["mongo-logs-collection"] - - with self.pymongo_client(mongo_collection) as pymongo_client: - job_log_col = pymongo_client[self.mongo_database][self.mongo_collection] - try: - find_filter = {"_id": ObjectId(job_id)} - job_log = job_log_col.find_one(find_filter) - except Exception as e: - error_msg = "Unable to find job\n" - error_msg += "ERROR -- {}:\n{}".format( - e, "".join(traceback.format_exception(None, e, e.__traceback__)) - ) - raise ValueError(error_msg) + job_log_col = self.pymongoc[self.mongo_database][self._col_logs] + try: + find_filter = {"_id": ObjectId(job_id)} + job_log = job_log_col.find_one(find_filter) + except Exception as e: + error_msg = "Unable to find job\n" + error_msg += "ERROR -- {}:\n{}".format( + e, "".join(traceback.format_exception(None, e, e.__traceback__)) + ) + raise ValueError(error_msg) - if not job_log: - raise RecordNotFoundException( - "Cannot find job log with id: {}".format(job_id) - ) + if not job_log: + raise RecordNotFoundException( + "Cannot find job log with id: {}".format(job_id) + ) return job_log @@ -290,88 +278,24 @@ def update_job_to_queued( raise ValueError("None of the 3 arguments can be falsy") # could also test that the job ID is a valid job ID rather than having mongo throw an # error - mongo_collection = self.config["mongo-jobs-collection"] - queue_time_now = time.time() - with self.pymongo_client(mongo_collection) as pymongo_client: - ee2_jobs_col = pymongo_client[self.mongo_database][mongo_collection] - # should we check that the job was updated and do something if it wasn't? - ee2_jobs_col.update_one( - {"_id": ObjectId(job_id), "status": Status.created.value}, - {"$set": {"status": Status.queued.value, "queued": queue_time_now}}, - ) - # originally had a single query, but seems safer to always record the scheduler - # state no matter the state of the job - ee2_jobs_col.update_one( - {"_id": ObjectId(job_id)}, - { - "$set": { - "scheduler_id": scheduler_id, - "scheduler_type": scheduler_type, - } - }, - ) - - def update_jobs_to_queued( - self, job_id_pairs: List[JobIdPair], scheduler_type: str = "condor" - ) -> None: - f""" - * Adds scheduler id to list of jobs - * Updates a list of {Status.created.value} jobs to queued. Does not work on jobs that already have gone through any other - status transition. If the record is not in the {Status.created.value} status, nothing will happen - :param job_id_pairs: A list of pairs of Job Ids and Scheduler Ids - :param scheduler_type: The scheduler this job was queued in, default condor - """ - - bulk_update_scheduler_jobs = [] - bulk_update_created_to_queued = [] queue_time_now = time.time() - for job_id_pair in job_id_pairs: - if job_id_pair.job_id is None: - raise ValueError( - f"Provided a bad job_id_pair, missing job_id for {job_id_pair.scheduler_id}" - ) - elif job_id_pair.scheduler_id is None: - raise ValueError( - f"Provided a bad job_id_pair, missing scheduler_id for {job_id_pair.job_id}" - ) - - bulk_update_scheduler_jobs.append( - UpdateOne( - { - "_id": ObjectId(job_id_pair.job_id), - }, - { - "$set": { - "scheduler_id": job_id_pair.scheduler_id, - "scheduler_type": scheduler_type, - } - }, - ) - ) - bulk_update_created_to_queued.append( - UpdateOne( - { - "_id": ObjectId(job_id_pair.job_id), - "status": Status.created.value, - }, - { - "$set": { - "status": Status.queued.value, - "queued": queue_time_now, - } - }, - ) - ) - # Update provided jobs with scheduler id. Then only update non terminated jobs into updated status. - mongo_collection = self.config["mongo-jobs-collection"] - - if bulk_update_scheduler_jobs: - with self.pymongo_client(mongo_collection) as pymongo_client: - ee2_jobs_col = pymongo_client[self.mongo_database][mongo_collection] - # Bulk Update to add scheduler ids - ee2_jobs_col.bulk_write(bulk_update_scheduler_jobs, ordered=False) - # Bulk Update to add queued status ids - ee2_jobs_col.bulk_write(bulk_update_created_to_queued, ordered=False) + ee2_jobs_col = self.pymongoc[self.mongo_database][self._col_jobs] + # should we check that the job was updated and do something if it wasn't? + ee2_jobs_col.update_one( + {"_id": ObjectId(job_id), "status": Status.created.value}, + {"$set": {"status": Status.queued.value, "queued": queue_time_now}}, + ) + # originally had a single query, but seems safer to always record the scheduler + # state no matter the state of the job + ee2_jobs_col.update_one( + {"_id": ObjectId(job_id)}, + { + "$set": { + "scheduler_id": scheduler_id, + "scheduler_type": scheduler_type, + } + }, + ) def cancel_job(self, job_id=None, terminated_code=None): """ @@ -542,26 +466,6 @@ def insert_jobs(self, jobs_to_insert: List[Job]) -> List[ObjectId]: inserted = Job.objects.insert(doc_or_docs=jobs_to_insert, load_bulk=False) return inserted - def insert_one(self, doc): - """ - insert a doc into collection - """ - self.logger.debug("start inserting document") - - with self.pymongo_client(self.mongo_collection) as pymongo_client: - try: - rec = pymongo_client[self.mongo_database][ - self.mongo_collection - ].insert_one(doc) - except Exception as e: - error_msg = "Cannot insert doc\n" - error_msg += "ERROR -- {}:\n{}".format( - e, "".join(traceback.format_exception(None, e, e.__traceback__)) - ) - raise ValueError(error_msg) - - return rec.inserted_id - def _push_job_logs(self, log_lines: JobLog, job_id: str, record_count: int): """append a list of job logs, and update the record count""" @@ -574,80 +478,15 @@ def _push_job_logs(self, log_lines: JobLog, job_id: str, record_count: int): "updated": time.time(), } update = {"$push": push_op, "$set": set_op} - with self.pymongo_client(self.mongo_collection) as pymongo_client: - job_col = pymongo_client[self.mongo_database][self.mongo_collection] - try: - job_col.update_one(update_filter, update, upsert=False) - except Exception as e: - error_msg = "Cannot update doc\n ERROR -- {}:\n{}".format( - e, "".join(traceback.format_exception(None, e, e.__traceback__)) - ) - raise ValueError(error_msg) - - slc = job_col.find_one({"_id": ObjectId(job_id)}).get("stored_line_count") - - return slc - - def update_one(self, doc, job_id): - """ - update existing records or create if they do not exist - https://docs.mongodb.com/manual/reference/operator/update/set/ - """ - with self.pymongo_client(self.mongo_collection) as pymongo_client: - job_col = pymongo_client[self.mongo_database][self.mongo_collection] - try: - update_filter = {"_id": ObjectId(job_id)} - update = {"$set": doc} - job_col.update_one(update_filter, update, upsert=True) - except Exception as e: - error_msg = "Connot update doc\n" - error_msg += "ERROR -- {}:\n{}".format( - e, "".join(traceback.format_exception(None, e, e.__traceback__)) - ) - raise ValueError(error_msg) - - return True - - def delete_one(self, job_id): - """ - delete a doc by _id - """ - self.logger.debug("start deleting document") - with self.pymongo_client(self.mongo_collection) as pymongo_client: - job_col = pymongo_client[self.mongo_database][self.mongo_collection] - try: - delete_filter = {"_id": ObjectId(job_id)} - job_col.delete_one(delete_filter) - except Exception as e: - error_msg = "Connot delete doc\n" - error_msg += "ERROR -- {}:\n{}".format( - e, "".join(traceback.format_exception(None, e, e.__traceback__)) - ) - raise ValueError(error_msg) - - return True - - def find_in(self, elements, field_name, projection=None, batch_size=1000): - """ - return cursor that contains docs which field column is in elements - """ - self.logger.debug("start querying MongoDB") - - with self.pymongo_client(self.mongo_collection) as pymongo_client: - job_col = pymongo_client[self.mongo_database][self.mongo_collection] - try: - result = job_col.find( - {field_name: {"$in": elements}}, - projection=projection, - batch_size=batch_size, - ) - except Exception as e: - error_msg = "Connot query doc\n" - error_msg += "ERROR -- {}:\n{}".format( - e, "".join(traceback.format_exception(None, e, e.__traceback__)) - ) - raise ValueError(error_msg) + job_col = self.pymongoc[self.mongo_database][self._col_logs] + try: + job_col.update_one(update_filter, update, upsert=False) + except Exception as e: + error_msg = "Cannot update doc\n ERROR -- {}:\n{}".format( + e, "".join(traceback.format_exception(None, e, e.__traceback__)) + ) + raise ValueError(error_msg) - self.logger.debug("returned {} results".format(result.count())) + slc = job_col.find_one({"_id": ObjectId(job_id)}).get("stored_line_count") - return result + return slc diff --git a/lib/execution_engine2/execution_engine2Impl.py b/lib/execution_engine2/execution_engine2Impl.py index 2be7e6813..15005b031 100644 --- a/lib/execution_engine2/execution_engine2Impl.py +++ b/lib/execution_engine2/execution_engine2Impl.py @@ -28,7 +28,7 @@ class execution_engine2: # state. A method could easily clobber the state set by another while # the latter method is running. ######################################### noqa - VERSION = "0.0.7" + VERSION = "0.0.8" GIT_URL = "https://github.com/mrcreosote/execution_engine2.git" GIT_COMMIT_HASH = "2ad95ce47caa4f1e7b939651f2b1773840e67a8a" diff --git a/test/tests_for_db/ee2_MongoUtil_test.py b/test/tests_for_db/ee2_MongoUtil_test.py index 32933e99b..4cd973e60 100644 --- a/test/tests_for_db/ee2_MongoUtil_test.py +++ b/test/tests_for_db/ee2_MongoUtil_test.py @@ -55,7 +55,6 @@ def test_init_ok(self): "mongo_user", "mongo_pass", "mongo_authmechanism", - "mongo_collection", ] mongo_util = self.getMongoUtil() self.assertTrue(set(class_attri) <= set(mongo_util.__dict__.keys())) @@ -106,74 +105,6 @@ def test_update_job_to_queued(self): assert j.queued is None assert j.status == state.value - def test_update_jobs_enmasse(self): - """Check to see that created jobs get updated to queued""" - for state in Status: - job = get_example_job(status=Status.created.value, scheduler_id=None) - job2 = get_example_job(status=state.value, scheduler_id=None) - job3 = get_example_job(status=state.value, scheduler_id=None) - jobs = [job, job2, job3] - - for j in jobs: - j.scheduler_id = None - j.save() - assert j.scheduler_id is None - - job_ids = [job.id, job2.id, job3.id] - scheduler_ids = ["humpty", "dumpty", "alice"] - jobs_to_update = list(map(JobIdPair, job_ids, scheduler_ids)) - - now_ms = time.time() - self.getMongoUtil().update_jobs_to_queued(jobs_to_update) - job.reload() - job2.reload() - job3.reload() - - # Check that sched ids are set - for i, val in enumerate(scheduler_ids): - assert jobs[i].scheduler_id == val - assert jobs[i].scheduler_type == "condor" - - # Checks that a timestamp in seconds since the epoch is within a second of the current time. - for j in jobs: - assert now_ms + 1 > j.updated - assert now_ms - 1 < j.updated - - # First job always should transition to queued - assert job.status == Status.queued.value - - # Created jobs should transition - if state.value == Status.created.value: - assert all(j.status == Status.queued.value for j in [job, job2, job3]) - - else: - # Don't change their state - assert all(j.status == state.value for j in [job2, job3]) - - def test_update_jobs_enmasse_bad_job_pairs(self): - job = get_example_job(status=Status.created.value).save() - job2 = get_example_job(status=Status.created.value).save() - job3 = get_example_job(status=Status.created.value).save() - job_ids = [job.id, job2.id, job3.id] - scheduler_ids = [job.scheduler_id, job2.scheduler_id, None] - job_id_pairs = list(map(JobIdPair, job_ids, scheduler_ids)) - - with self.assertRaisesRegex( - expected_exception=ValueError, - expected_regex=f"Provided a bad job_id_pair, missing scheduler_id for {job3.id}", - ): - self.getMongoUtil().update_jobs_to_queued(job_id_pairs) - - job_ids = [job.id, job2.id, None] - scheduler_ids = [job.scheduler_id, job2.scheduler_id, job3.scheduler_id] - job_id_pairs = list(map(JobIdPair, job_ids, scheduler_ids)) - - with self.assertRaisesRegex( - expected_exception=ValueError, - expected_regex=f"Provided a bad job_id_pair, missing job_id for {job3.scheduler_id}", - ): - self.getMongoUtil().update_jobs_to_queued(job_id_pairs) - def test_get_by_cluster(self): """Get a job by its condor scheduler_id""" mongo_util = self.getMongoUtil() @@ -347,113 +278,6 @@ def test_connection_ok(self): mongo_util.get_job(job_id=j.id).delete() self.assertEqual(ori_job_count, Job.objects.count()) - def test_insert_one_ok(self): - mongo_util = self.getMongoUtil() - - with mongo_util.pymongo_client( - self.config["mongo-jobs-collection"] - ) as pymongo_client: - col = pymongo_client[self.config["mongo-database"]][ - self.config["mongo-jobs-collection"] - ] - - ori_job_count = col.count_documents({}) - doc = {"test_key": "foo"} - job_id = mongo_util.insert_one(doc) - self.assertEqual(ori_job_count, col.count_documents({}) - 1) - - result = list(col.find({"_id": ObjectId(job_id)}))[0] - self.assertEqual(result["test_key"], "foo") - - col.delete_one({"_id": ObjectId(job_id)}) - self.assertEqual(col.count_documents({}), ori_job_count) - - def test_find_in_ok(self): - mongo_util = self.getMongoUtil() - - with mongo_util.pymongo_client( - self.config["mongo-jobs-collection"] - ) as pymongo_client: - col = pymongo_client[self.config["mongo-database"]][ - self.config["mongo-jobs-collection"] - ] - - ori_job_count = col.count_documents({}) - doc = {"test_key_1": "foo", "test_key_2": "bar"} - job_id = mongo_util.insert_one(doc) - self.assertEqual(ori_job_count, col.count_documents({}) - 1) - - # test query empty field - elements = ["foobar"] - docs = mongo_util.find_in(elements, "test_key_1") - self.assertEqual(docs.count(), 0) - - # test query "foo" - elements = ["foo"] - docs = mongo_util.find_in(elements, "test_key_1") - self.assertEqual(docs.count(), 1) - doc = docs.next() - self.assertTrue("_id" in doc.keys()) - self.assertTrue(doc.get("_id"), job_id) - self.assertEqual(doc.get("test_key_1"), "foo") - - col.delete_one({"_id": ObjectId(job_id)}) - self.assertEqual(col.count_documents({}), ori_job_count) - - def test_update_one_ok(self): - mongo_util = self.getMongoUtil() - - with mongo_util.pymongo_client( - self.config["mongo-jobs-collection"] - ) as pymongo_client: - col = pymongo_client[self.config["mongo-database"]][ - self.config["mongo-jobs-collection"] - ] - - ori_job_count = col.count_documents({}) - doc = {"test_key_1": "foo"} - job_id = mongo_util.insert_one(doc) - self.assertEqual(ori_job_count, col.count_documents({}) - 1) - - elements = ["foo"] - docs = mongo_util.find_in(elements, "test_key_1") - self.assertEqual(docs.count(), 1) - doc = docs.next() - self.assertTrue("_id" in doc.keys()) - self.assertTrue(doc.get("_id"), job_id) - self.assertEqual(doc.get("test_key_1"), "foo") - - mongo_util.update_one({"test_key_1": "bar"}, job_id) - - elements = ["foo"] - docs = mongo_util.find_in(elements, "test_key_1") - self.assertEqual(docs.count(), 0) - - elements = ["bar"] - docs = mongo_util.find_in(elements, "test_key_1") - self.assertEqual(docs.count(), 1) - - col.delete_one({"_id": ObjectId(job_id)}) - self.assertEqual(col.count_documents({}), ori_job_count) - - def test_delete_one_ok(self): - mongo_util = MongoUtil(self.config) - with mongo_util.pymongo_client(self.config["mongo-jobs-collection"]) as pc: - col = pc.get_database(self.config["mongo-database"]).get_collection( - self.config["mongo-jobs-collection"] - ) - - doc_count = col.count_documents({}) - logging.info("Found {} documents".format(doc_count)) - - doc = {"test_key_1": "foo", "test_key_2": "bar"} - job_id = mongo_util.insert_one(doc) - - self.assertEqual(col.count_documents({}), doc_count + 1) - logging.info("Assert 0 documents") - mongo_util.delete_one(job_id) - self.assertEqual(col.count_documents({}), doc_count) - def test_get_job_log_pymongo_ok(self): mongo_util = self.getMongoUtil() @@ -465,20 +289,17 @@ def test_get_job_log_pymongo_ok(self): jl.stored_line_count = 0 jl.lines = [] - with mongo_util.pymongo_client( - self.config["mongo-jobs-collection"] - ) as pymongo_client: - jl_col = pymongo_client[self.config["mongo-database"]][ - self.config["mongo-logs-collection"] - ] + jl_col = mongo_util.pymongoc[self.config["mongo-database"]][ + self.config["mongo-logs-collection"] + ] - ori_jl_count = jl_col.count_documents({}) + ori_jl_count = jl_col.count_documents({}) - jl.save() # save job log + jl.save() # save job log - self.assertEqual(JobLog.objects.count(), ori_jl_count + 1) - job_log = mongo_util.get_job_log_pymongo(str(primary_key)) + self.assertEqual(JobLog.objects.count(), ori_jl_count + 1) + job_log = mongo_util.get_job_log_pymongo(str(primary_key)) - self.assertEqual(job_log.get("original_line_count"), 0) - self.assertEqual(job_log.get("stored_line_count"), 0) - self.assertIsNone(job_log.get("lines")) + self.assertEqual(job_log.get("original_line_count"), 0) + self.assertEqual(job_log.get("stored_line_count"), 0) + self.assertIsNone(job_log.get("lines")) diff --git a/test/tests_for_db/ee2_model_test.py b/test/tests_for_db/ee2_model_test.py index e1747aa20..c6ecc2871 100644 --- a/test/tests_for_db/ee2_model_test.py +++ b/test/tests_for_db/ee2_model_test.py @@ -33,28 +33,28 @@ def setUpClass(cls): def test_insert_job(self): logging.info("Testing insert job") - with self.mongo_util.mongo_engine_connection(), self.mongo_util.pymongo_client( - self.config["mongo-jobs-collection"] - ) as pc: - job = get_example_job() - job.save() - logging.info(f"Inserted {job.id}") - - logging.info(f"Searching for {job.id}") - db = self.config["mongo-database"] - coll = self.config["mongo-jobs-collection"] - saved_job = pc[db][coll].find_one({"_id": ObjectId(job.id)}) - logging.info("Found") - logging.info(saved_job) - - print(job.wsid) - print(saved_job["wsid"]) - self.assertEqual(job.wsid, saved_job["wsid"]) - self.assertEqual( - job.job_input.narrative_cell_info.cell_id, - saved_job["job_input"]["narrative_cell_info"]["cell_id"], - ) + job = get_example_job() + job.save() + + logging.info(f"Inserted {job.id}") + + logging.info(f"Searching for {job.id}") + db = self.config["mongo-database"] + coll = self.config["mongo-jobs-collection"] + saved_job = self.mongo_util.pymongoc[db][coll].find_one( + {"_id": ObjectId(job.id)} + ) + logging.info("Found") + logging.info(saved_job) + + print(job.wsid) + print(saved_job["wsid"]) + self.assertEqual(job.wsid, saved_job["wsid"]) + self.assertEqual( + job.job_input.narrative_cell_info.cell_id, + saved_job["job_input"]["narrative_cell_info"]["cell_id"], + ) def test_insert_log(self): """