diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 68376cfdd..750cb8cb9 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,6 +1,10 @@ # execution_engine2 (ee2) release notes ========================================= +## 0.0.7 +* Fixed a bug that could cause missing `queued` timestamps if many jobs were submitted in a + batch + ## 0.0.6 * Release of MVP diff --git a/docs/adrs/004-SplitAndAggregate.md b/docs/adrs/004-SplitAndAggregate.md new file mode 100644 index 000000000..1bbf641e1 --- /dev/null +++ b/docs/adrs/004-SplitAndAggregate.md @@ -0,0 +1,190 @@ +# Replace KBParallels with another solution to avoid Deadlocks + +Date: 2021-09-22 + +[Related ADR](https://github.com/kbase/execution_engine2/blob/develop/docs/adrs/rework-batch-analysis-architecture.md) + +## Note +This ADR is more of a place to keep the current discussions we had at https://docs.google.com/document/d/1AWjayMoqCoGkpO9-tjXxEvO40yYnFtcECbdne5vTURo +Rather than to make a decision. There is still more planning, scoping and testing involved before we can fully design this system. + +Still to be determined (not in scope of this ADR): +* UI and how it relates to the bulk execution +* XSV Analysis and how it relates to the bulk execution + + +## Intro +Sometimes a calculation requires too many resources from one node (walltime, memory, disk), so the calculation gets spread across multiple machines. +The final step of the app that uses KBParallels is to create a report. This step may use results from all of the computed jobs to create the final report. +In order to do this, the following apps use a mechanism called KBParallel +* kb_Bowtie2 +* refseq_importer +* kb_concoct +* kb_phylogenomics +* kb_hisat2 +* kb_meta_decoder +* kb_Bwa + +## The current implementation of Batch Analysis in [kb_BatchApp](https://github.com/kbaseapps/kb_BatchApp) at KBase has the following issues: + +* Current UI is not adequate: Users shouldn’t have to code in order to run batch analysis. Also it’s difficult to do so, even for those familiar with KBase code (have to find object names) +* Dependency on [KBParallel](https://github.com/kbaseapps/KBParallel): any changes to KBParallel could affect KB Batch and subsequently all other apps. +* Queue deadlocking: users have a max of 10 slots in the queue, with the current implementation one management job is created to manage the jobs that it submits. This could lead to deadlock scenarios, as there can be 10 management jobs waiting to submit computation jobs, but they cannot, as there all slots are being used up. +* KBP can spawn other KBP jobs. Batch jobs can spawn other batch jobs. +* Missing the ability to be able to run, manage (cancel) and track jobs and their subjobs along with the ability to specify resources differently between the main and sub jobs +* No good way to test and hard to benchmark or measure performance +* Code is split more than is necessary +* UI doesn't properly display progress of batch jobs + +## Author(s) + +@bio-boris, @mrcreosote + +## Status +Needs more planning, but current ideas are documented here + + +## Decision Outcome (pending more research to iron out more details) + +For the first pass, we would likely limit the number of kbparallel runs. + +For the next pass, we would want to create a comprehensive generalized solution to submit,split and aggregate, with recipes or conveniences for common operations for creating sets, reports, or things of that nature. + +We would also want to do a user study on what we want from the UI and which functionality we want, as the UI may inform the design of the backend system. + + +### Deprecate KBP and instead break out apps into 3 parts + +* Fan out (FO) +* Process in parallel (PIP) +* Fan in (FI) + + +### Steps: +1. User launches job as normal +2. Possibly the job is marked as a FO job, Makes it easier for the UI to display the job correctly initially, Ideally would be marked in the spec, but this might be a lot of work Could potentially be marked in the catalog UI (e.g. along with the job requirements) +3. Job figures out what the PIP / sub jobs should be +4. Job sends the following info to EE2 +* Its own job ID +* The parameters for each of the sub jobs +* The app of the FI job, e.g. kb_phylogenomics/build_microbial_speciestree_reduce +* EE2 starts the subjobs and associates them with with FO job (Probably need retry handling for the subjobs to deal with transient errors) +5. Whenever a subjob finishes, EE2 checks to see if all the subjobs are finished +* If true, EE2 starts the FI job, providing the outputs of the subjobs as a list to the reduce job +* When the FI job is finished, the job is done. +* The various jobs can communicate by storing temporary data in the caching service or in the Blobstore. If the latter is used, the FI job should clean up the Blobstore nodes when its complete. +* Could make a helper app for this? +* What about workflow engines (WDL, Cromwell)? Are we reinventing the wheel here? +* Can new EE2 endpoints speed up or reduce the complexity of any of these steps? + +### Notes about DAG in ee2 Endpoints +``` +Your dag would need to have (at least) a first job followed by a SUBDAG EXTERNAL. +Somewhere in the first job you'd generate a new dag workflow that +defines the N clusters followed by the N+1 job, which runs in the +subdag. + +As for DAGMan support in the Python bindings, we do this in the +following two ways: + +1) There is a htcondor.Submit.from_dag() option which takes the name +of a dag filename. You then submit the resulting object just like any +regular job. +2) We have a htcondor.dags library which can be used to +programmatically construct a DAG workflow in computer memory, then +write to a .dag file and submit using the function mentioned in 1) +above. +``` + +Between these there are several different ways to do what you want. + +There's a useful example here that shows the general workflow in the +bindings: https://htcondor.readthedocs.io/en/latest/apis/python-bindings/tutorials/DAG-Creation-And-Submission.html#Describing-the-DAG-using-htcondor.dags + +## Consequences + +* We will have to implement a new narrative UI, however this was work that would happen regardless due as we are looking to improve the UX for batch upload and analysis at KBase. +* This will take significant time to further research and engineer the solutions + +Still to be determined (not in scope of this ADR): +* UI and how it relates to the bulk execution +* XSV Analysis and how it relates to the bulk execution + +## Alternatives Considered + +* Ignore most issues and just make apps that run kbparallels limited to N instances of kbparallels per user to avoid deadlocks +* Remove kbparallels and change apps to a collection of 2-3 apps that do submit, split and aggregate and an use an ee2 endpoint to create a DAG +* Different DevOps solutions +* Rewriting KBP or swapping it out for a lightweight alternative that has a subset of the KBP features + + +## Pros and Cons of the Alternatives + +### General Notes +* With the current implementation of KBP, Having a separate KBP queue with multiple machines can save a spot from a user's 10 job maximum for running more jobs, but takes up / wastes compute resources (especially if the nodes sit idle). The user still gets 10 jobs, but there are less spots for jobs to run overall in the system if we make another queue, as this requires taking up more compute nodes that are currently dedicated to the NJS queue. +* Without changing apps that use KBP, running multiple KBP apps on the same machine can interfere with each other and we want to avoid this. +* If we scrap KBP in favor of a "lightweight alternative" we can avoid some of the previous issues, if we modify all apps that use KBP to use a lightweight alternative. A lightweight alternative would have to guarantee that no computation besides job management occured, and then we could have the management jobs sit and wait for other jobs without interfering with other jobs on the system. + +### Increase number of slots per user > 10 +* `+` Simple solutions, quick turnarounds, fixes deadlock issue for small numbers of jobs. +* `-` Doesn't fix deadlock issue as the user can still submit more KBP jobs +* `-` Addresses only the deadlocking issue, UI still broken for regular runs and batch runs +* `-` A small amount of users can take over the entire system by being able to submit more than 10 jobs +* `-` > 10 nodes will continue be taken up by jobs that do little computation as each job gets its own node +* `-` Capacity is still wasted, as some KBP jobs sit around waiting for other jobs to run + +### LIMIT KBP jobs to a maximum of N<10 active KBP jobs per user +* `+` Simple solution requires ee2 to maintain list of KBP apps, and add a KBP_LIMIT to jobs from this list. [Condor](https://github.com/kbase/condor/pull/26) will need KBP_LIMIT Added +* `+` List of apps is not frequently updated +* `+` Apps do not need to be modified +* `-` If a new app uses KBP and their app is not on the list, it won't be limited by the KBP_LIMIT unless the owner lets us know. +* `-` If an existing app no longer uses KBP, their app is still limited unless the owner lets us know. +* `-` Nodes will continue be taken up by jobs that do little computation as each job gets its own node. +* `-` Users may not be able to effectively use up their 10 job spots +* `-` Capacity is still wasted, as some KBP jobs sit around waiting for other jobs to run + +### LIMIT KBP jobs to a maximum of N<10 active jobs per user + Seperate queue for kbparallels apps +* `+` Same pros as above +* `+` Users will be able to more effectively use their 10 job spots +* `+` Allows us to group up KBP jobs onto fewer machines, instead of giving them their entire node +* `-` Requires going through each app and understanding the worst case computational needs in order to set the estimated cpu and memory needs for each app +* `-` Apps can interfere with other innocent apps and take them down +* `-` Creating a new queue requires balancing between how many active KBP nodes there vs how many nodes are available for other NJS jobs. +* `-` Capacity is still wasted, as some KBP jobs sit around waiting for other jobs to run + + +### Build KBP Lightweight Version + KBP Queue + + +#### Design of new verison + + +* All apps must be modified to use the new KBP lightweight version, which will: +* Can either modify KBP, or create a new tool/package to use instead of KBP + + +1) Launch a management job called the *Job Manager* that sits in the KBP Queue, alongside other KBP jobs. Other jobs are launched in the NJS queue. +2) Launch the *Setup Job* which will + * Use the *User Parameters* and/or + * Optionally Download the Data from the *User Parameters* to figure out *Job Manager* parameters + * Use the results of information gathered from the initial download and or *User Parameters* + * Generate final parameters to be sent to the *Job Manager* to launch *Fan Out* jobs, or directly launch *Fan Out* jobs and return job ids +3) The *Job Manager* now has enough parameters to launch and/or monitor *Fan Out* Jobs, and monitor/manage the jobs (and possibly retry them upon failure) +4) *Fan Out* jobs download data and perform calculations, save them back to the system, and return references to the saved objects +5) The *Job Manager* launches one *FanIn* job based on User Parameters and or the results of *Fan Out* Jobs +6) The *FanIn* (a.k.a Group/Reduce/Report) job downloads objects from the system, and creates a set or other grouping, and then saves the object(s) back to the system. Final data and report is uploaded back to the system +8) The *Job Manager* returns the reference to the results of the *Report Job* + +Pros/Cons + +* `+` All KBP jobs can run on a small subset of machines, deadlock issue is fixed +* `+` No changes to ee2 required +* `-` Addresses the deadlocking issue, UI still broken for regular runs and batch runs if we re-use KBP +* `-` On an as needed basis, would have to rewrite apps that use KBP to use this new paradigm + + +### Modify Apps to do only local submission by remove KBP, and moving the job +* `+` Simple solutions, quick turnarounds, fixes deadlock issue, fixes UI issues +* `-` We have a limited number of larger resources machines +* `-` Continued dependency on deprecated KBP tools +* `-` App runs may take longer since fewer resources may be available to the app run diff --git a/kbase.yml b/kbase.yml index 0cee4a309..15f81f845 100644 --- a/kbase.yml +++ b/kbase.yml @@ -8,7 +8,7 @@ service-language: python module-version: - 0.0.5 + 0.0.7 owners: [bsadkhin, tgu2, wjriehl, gaprice] diff --git a/lib/execution_engine2/db/MongoUtil.py b/lib/execution_engine2/db/MongoUtil.py index 349b066bc..f71d53cc1 100644 --- a/lib/execution_engine2/db/MongoUtil.py +++ b/lib/execution_engine2/db/MongoUtil.py @@ -3,8 +3,7 @@ import time import traceback from contextlib import contextmanager -from datetime import datetime -from typing import Dict, List +from typing import Dict, List, NamedTuple from bson.objectid import ObjectId from mongoengine import connect, connection from pymongo import MongoClient, UpdateOne @@ -17,7 +16,11 @@ ) from lib.execution_engine2.utils.arg_processing import parse_bool -from execution_engine2.sdk.EE2Runjob import JobIdPair + + +class JobIdPair(NamedTuple): + job_id: str + scheduler_id: str class MongoUtil: @@ -272,6 +275,42 @@ def check_if_already_finished(job_status): return True return False + def update_job_to_queued( + self, job_id: str, scheduler_id: str, scheduler_type: str = "condor" + ) -> None: + f""" + * Updates a {Status.created.value} job to queued and sets scheduler state. + Always sets scheduler state, but will only update to queued if the job is in the + {Status.created.value} state. + :param job_id: the ID of the job. + :param scheduler_id: the scheduler's job ID for the job. + :param scheduler_type: The scheduler this job was queued in, default condor + """ + if not job_id or not scheduler_id or not scheduler_type: + raise ValueError("None of the 3 arguments can be falsy") + # could also test that the job ID is a valid job ID rather than having mongo throw an + # error + mongo_collection = self.config["mongo-jobs-collection"] + queue_time_now = time.time() + with self.pymongo_client(mongo_collection) as pymongo_client: + ee2_jobs_col = pymongo_client[self.mongo_database][mongo_collection] + # should we check that the job was updated and do something if it wasn't? + ee2_jobs_col.update_one( + {"_id": ObjectId(job_id), "status": Status.created.value}, + {"$set": {"status": Status.queued.value, "queued": queue_time_now}}, + ) + # originally had a single query, but seems safer to always record the scheduler + # state no matter the state of the job + ee2_jobs_col.update_one( + {"_id": ObjectId(job_id)}, + { + "$set": { + "scheduler_id": scheduler_id, + "scheduler_type": scheduler_type, + } + }, + ) + def update_jobs_to_queued( self, job_id_pairs: List[JobIdPair], scheduler_type: str = "condor" ) -> None: @@ -285,7 +324,7 @@ def update_jobs_to_queued( bulk_update_scheduler_jobs = [] bulk_update_created_to_queued = [] - queue_time_now = datetime.utcnow().timestamp() + queue_time_now = time.time() for job_id_pair in job_id_pairs: if job_id_pair.job_id is None: raise ValueError( diff --git a/lib/execution_engine2/execution_engine2Impl.py b/lib/execution_engine2/execution_engine2Impl.py index 5b6366de5..2be7e6813 100644 --- a/lib/execution_engine2/execution_engine2Impl.py +++ b/lib/execution_engine2/execution_engine2Impl.py @@ -28,7 +28,7 @@ class execution_engine2: # state. A method could easily clobber the state set by another while # the latter method is running. ######################################### noqa - VERSION = "0.0.5" + VERSION = "0.0.7" GIT_URL = "https://github.com/mrcreosote/execution_engine2.git" GIT_COMMIT_HASH = "2ad95ce47caa4f1e7b939651f2b1773840e67a8a" diff --git a/lib/execution_engine2/sdk/EE2Runjob.py b/lib/execution_engine2/sdk/EE2Runjob.py index ec6d3952c..2beed9a7a 100644 --- a/lib/execution_engine2/sdk/EE2Runjob.py +++ b/lib/execution_engine2/sdk/EE2Runjob.py @@ -75,11 +75,6 @@ class PreparedJobParams(NamedTuple): job_id: str -class JobIdPair(NamedTuple): - job_id: str - scheduler_id: str - - from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -312,17 +307,11 @@ def _run_multiple(self, runjob_params: List[Dict]): ).start() return job_ids - def _update_to_queued_multiple(self, job_ids, scheduler_ids): + def _finish_multiple_job_submission(self, job_ids): """ This is called during job submission. If a job is terminated during job submission, we have the chance to re-issue a termination and remove the job from the Job Queue """ - if len(job_ids) != len(scheduler_ids): - raise Exception( - "Need to provide the same amount of job ids and scheduler_ids" - ) - jobs_to_update = list(map(JobIdPair, job_ids, scheduler_ids)) - self.sdkmr.get_mongo_util().update_jobs_to_queued(jobs_to_update) jobs = self.sdkmr.get_mongo_util().get_jobs(job_ids) for job in jobs: @@ -377,14 +366,21 @@ def _submit_multiple(self, job_submission_params): ) raise RuntimeError(error_msg) condor_job_ids.append(condor_job_id) - - self.logger.error(f"It took {time.time() - begin} to submit jobs to condor") - # It took 4.836009502410889 to submit jobs to condor + # Previously the jobs were updated in a batch after submitting all jobs to condor. + # This led to issues where a large job count could result in jobs switching to + # running prior to all jobs being submitted and so the queued timestamp was + # never added to the job record. + self.sdkmr.get_mongo_util().update_job_to_queued(job_id, condor_job_id) + + self.logger.error( + f"It took {time.time() - begin} to submit jobs to condor and update to queued" + ) update_time = time.time() - self._update_to_queued_multiple(job_ids=job_ids, scheduler_ids=condor_job_ids) - # It took 1.9239885807037354 to update jobs - self.logger.error(f"It took {time.time() - update_time} to update jobs ") + self._finish_multiple_job_submission(job_ids=job_ids) + self.logger.error( + f"It took {time.time() - update_time} to finish job submission" + ) return job_ids diff --git a/test/tests_for_db/ee2_MongoUtil_test.py b/test/tests_for_db/ee2_MongoUtil_test.py index 9591f16a1..32933e99b 100644 --- a/test/tests_for_db/ee2_MongoUtil_test.py +++ b/test/tests_for_db/ee2_MongoUtil_test.py @@ -1,18 +1,20 @@ # -*- coding: utf-8 -*- import logging import os +import time import unittest -from datetime import datetime +from pytest import raises from bson.objectid import ObjectId -from execution_engine2.db.MongoUtil import MongoUtil +from execution_engine2.db.MongoUtil import MongoUtil, JobIdPair from execution_engine2.db.models.models import Job, JobLog, Status -from execution_engine2.sdk.EE2Runjob import JobIdPair from test.utils_shared.test_utils import ( bootstrap, get_example_job, read_config_into_dict, + assert_exception_correct, + assert_close_to_now, ) from tests_for_db.mongo_test_helper import MongoTestHelper @@ -70,6 +72,40 @@ def test_insert_jobs(self): for i, retrieved_job in enumerate(retrieved_jobs): assert jobs_to_insert[i].to_json() == retrieved_job.to_json() + def test_update_job_to_queued_fail_with_bad_args(self): + jid = "aaaaaaaaaaaaaaaaaaaaaaaa" + err = ValueError("None of the 3 arguments can be falsy") + self.update_job_to_queued_fail(None, "sid", "sch", err) + self.update_job_to_queued_fail("", "sid", "sch", err) + self.update_job_to_queued_fail(jid, None, "sch", err) + self.update_job_to_queued_fail(jid, "", "sch", err) + self.update_job_to_queued_fail(jid, "sid", None, err) + self.update_job_to_queued_fail(jid, "sid", "", err) + + def update_job_to_queued_fail(self, job_id, schd_id, schd, expected): + with raises(Exception) as got: + self.getMongoUtil().update_job_to_queued(job_id, schd_id, schd) + assert_exception_correct(got.value, expected) + + def test_update_job_to_queued(self): + for state in Status: + j = get_example_job(status=state.value) + j.scheduler_id = None + j.save() + assert j.scheduler_id is None + + self.getMongoUtil().update_job_to_queued(j.id, "schdID", "condenast") + j.reload() + assert_close_to_now(j.updated) + assert j.scheduler_id == "schdID" + assert j.scheduler_type == "condenast" + if state == Status.created: + assert_close_to_now(j.queued) + assert j.status == Status.queued.value + else: + assert j.queued is None + assert j.status == state.value + def test_update_jobs_enmasse(self): """Check to see that created jobs get updated to queued""" for state in Status: @@ -87,8 +123,7 @@ def test_update_jobs_enmasse(self): scheduler_ids = ["humpty", "dumpty", "alice"] jobs_to_update = list(map(JobIdPair, job_ids, scheduler_ids)) - now_ms = datetime.utcnow().timestamp() - + now_ms = time.time() self.getMongoUtil().update_jobs_to_queued(jobs_to_update) job.reload() job2.reload() diff --git a/test/tests_for_sdkmr/EE2Runjob_test.py b/test/tests_for_sdkmr/EE2Runjob_test.py index 6dec768bf..5d2c42f85 100644 --- a/test/tests_for_sdkmr/EE2Runjob_test.py +++ b/test/tests_for_sdkmr/EE2Runjob_test.py @@ -27,7 +27,7 @@ AuthError, InvalidParameterForBatch, ) -from execution_engine2.sdk.EE2Runjob import EE2RunJob, JobPermissions, JobIdPair +from execution_engine2.sdk.EE2Runjob import EE2RunJob, JobPermissions from execution_engine2.sdk.SDKMethodRunner import SDKMethodRunner from execution_engine2.sdk.job_submission_parameters import ( JobSubmissionParameters, @@ -908,12 +908,13 @@ def _check_common_mock_calls_batch( ) # update to queued state - child_job_pairs = [ - JobIdPair(_JOB_ID_1, _CLUSTER_1), - JobIdPair(_JOB_ID_2, _CLUSTER_2), - ] - mocks[MongoUtil].update_jobs_to_queued.assert_has_calls([call(child_job_pairs)]) - job_ids = [child_job_pair.job_id for child_job_pair in child_job_pairs] + mocks[MongoUtil].update_job_to_queued.assert_has_calls( + [ + call(_JOB_ID_1, _CLUSTER_1), + call(_JOB_ID_2, _CLUSTER_2), + ] + ) + job_ids = [_JOB_ID_1, _JOB_ID_2] mocks[MongoUtil].get_jobs.assert_has_calls([call(job_ids)]) if not terminated_during_submit: