From 8a21293c6644ebc89530ce90e3085d677aa5ee80 Mon Sep 17 00:00:00 2001 From: bio-boris Date: Wed, 7 Jun 2023 17:16:41 -0500 Subject: [PATCH] D->M (#463) 0.0.12 Forcing black to 22.1.0 to make sure that GHA doesn't suddenly fail Prevent jobs that never ran from submitting job execution stats 0.0.11 Add ability for kbase user to contact condor via token 0.0.10 Fixes bug with ee2 not recording all jobs with the catalog during the process of finishing a job Updates GHA with black and flake8 Fix flake8 and black formatting issues by formatting MANY files Updated docs for installing htcondor Update many python libs in requirements.txt --- .flake8 | 14 - .github/workflows/ee2-tests.yml | 84 +++-- .github/workflows/pr_build.yml | 2 +- .pre-commit-config.yaml | 4 +- Dockerfile | 4 +- Makefile | 3 +- Pipfile | 2 +- Pipfile.lock | 22 +- README.md | 6 +- RELEASE_NOTES.md | 17 + execution_engine2.html | 2 +- .../authorization/authstrategy.py | 2 +- lib/execution_engine2/db/models/models.py | 12 +- .../transfer/check_jobs_for_finish_times.py | 6 +- .../transfer/check_njs_jobs_for_ujs_id.py | 12 +- .../models/transfer/fix_transfer_ujs_njs.py | 12 +- .../execution_engine2Impl.py | 293 ++++++++++-------- .../execution_engine2Server.py | 16 +- lib/execution_engine2/sdk/EE2Runjob.py | 10 +- lib/execution_engine2/sdk/EE2Status.py | 16 +- lib/execution_engine2/utils/SlackUtils.py | 5 +- lib/execution_engine2/utils/slack_utils.py | 5 +- requirements-dev.txt | 6 +- requirements.txt | 10 +- scripts/entrypoint.sh | 8 + .../defunct_tests/ee2_catalog_test.py | 3 +- .../ee2_scheduler_integration_test.py | 2 +- .../manual_tests/ee2_scheduler_online_test.py | 14 +- test/tests_for_db/ee2_MongoUtil_test.py | 5 +- test/tests_for_integration/api_to_db_test.py | 7 +- test/tests_for_sdkmr/EE2Status_test.py | 48 ++- .../ee2_SDKMethodRunner_EE2Logs_test.py | 1 - .../ee2_SDKMethodRunner_test.py | 73 +++-- ...ee2_SDKMethodRunner_test_EE2Runjob_test.py | 4 +- ...ee2_SDKMethodRunner_test_EE2Status_test.py | 13 +- .../ee2_SDKMethodRunner_test_utils.py | 1 - test/tests_for_sdkmr/ee2_load_test.py | 7 +- tox.ini | 13 +- 38 files changed, 445 insertions(+), 319 deletions(-) delete mode 100644 .flake8 diff --git a/.flake8 b/.flake8 deleted file mode 100644 index cd50db704..000000000 --- a/.flake8 +++ /dev/null @@ -1,14 +0,0 @@ -[flake8] -# https://ljvmiranda921.github.io/notebook/2018/06/21/precommits-using-black-and-flake8/ -ignore = E203, E266, E501, W503, F403, F401 , E402 -max-line-length = 79 -max-complexity = 18 -select = B,C,E,F,W,T4,B9 -exclude = - .tox, - execution_engine2Impl.py, - lib/installed_clients/, - lib/execution_engine2/execution_engine2Impl.py, - lib/execution_engine2/authclient.py, - lib/biokbase/log.py, - *Impl.py diff --git a/.github/workflows/ee2-tests.yml b/.github/workflows/ee2-tests.yml index 16c264e83..671644388 100644 --- a/.github/workflows/ee2-tests.yml +++ b/.github/workflows/ee2-tests.yml @@ -1,4 +1,4 @@ -# This workflow will install Python dependencies, run tests and lint with a single version of Python +# This workflow will install Python dependencies, run tests and lint # For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions # To ssh into this build add the following: @@ -11,35 +11,61 @@ name: Execution Engine 2 Test Suite on: - [pull_request] + [ pull_request ] jobs: - build: + Lint_with_Black: runs-on: ubuntu-latest + name: Lint With Black steps: - - uses: actions/checkout@v2 - - name: Set up Python 3.8 - uses: actions/setup-python@v2 - with: - python-version: 3.8 - - name: Lint with flake8 and black - run: | - python -m pip install --upgrade pip - pip install flake8 black pytest - flake8 ./lib ./test - black --check ./lib ./test - - name: Install dependencies - run: | - if [ -f requirements.txt ]; then pip install -r requirements-dev.txt; fi - cd /opt - git clone https://github.com/kbase/jars - cd - - - name: Build Docker Image - run: | - docker build . -t execution_engine2:test - - name: Run Tests - run: | - docker-compose up -d - cp test/env/test.travis.env test.env - make test-coverage - codecov + - uses: actions/checkout@v3 + - uses: psf/black@stable + with: + options: "--check --verbose" + src: "./lib" + version: "22.10.0" + - uses: psf/black@stable + with: + options: "--check --verbose" + src: "./test" + version: "22.10.0" + + Lint_with_Flake8: + runs-on: ubuntu-latest + name: Lint With Flake8 + steps: + - name: Check out source repository + uses: actions/checkout@v3 + - name: Set up Python environment + uses: actions/setup-python@v4 + with: + python-version: "3.8" + - name: flake8 Lint Lib + uses: py-actions/flake8@v2 + with: + path: "./lib" + - name: flake8 Lint Test + uses: py-actions/flake8@v2 + with: + path: "./test" + + + Build_and_Run_Tests_and_CodeCov: + name: Build and Run Tests and CodeCov + runs-on: ubuntu-latest + steps: + - name: Check out source repository + uses: actions/checkout@v3 + - name: Install dependencies + run: | + pip install -r requirements.txt + git clone https://github.com/kbase/jars /opt/jars + - name: Build Docker Image + run: | + docker build . -t execution_engine2:test + - name: Run Tests + run: | + docker-compose up -d + cp test/env/test.travis.env test.env + make test-coverage + codecov diff --git a/.github/workflows/pr_build.yml b/.github/workflows/pr_build.yml index bf5d7e076..0fa1c4643 100644 --- a/.github/workflows/pr_build.yml +++ b/.github/workflows/pr_build.yml @@ -10,7 +10,7 @@ on: - opened - reopened - synchronize - - merged + - closed jobs: build-develop-open: if: github.base_ref == 'develop' && github.event.pull_request.merged == false diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4556abb64..8468b8c65 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,9 +1,9 @@ repos: - repo: https://github.com/ambv/black - rev: 22.1.0 + rev: 22.10.0 hooks: - id: black - exclude: '.+Impl.py' + exclude: ".*execution_engine2Impl.py" - repo: https://gitlab.com/pycqa/flake8 rev: '3.9.2' hooks: diff --git a/Dockerfile b/Dockerfile index 02fcbf9d4..baa46f512 100644 --- a/Dockerfile +++ b/Dockerfile @@ -61,7 +61,9 @@ COPY ./ /kb/module RUN mkdir -p /kb/module/work && chmod -R a+rw /kb/module && mkdir -p /etc/condor/ WORKDIR /kb/module -RUN make all +# Due to older kb-sdk in this base image, getting some compilation results we don't want +# Will have to manually use the correct version of kbase-sdk to compile impl/Server files +RUN make build # Remove Jars and old Conda for Trivy Scans and after compilation is done RUN rm -rf /sdk && rm -rf /opt diff --git a/Makefile b/Makefile index 848d03d5f..0e3ad8c5e 100644 --- a/Makefile +++ b/Makefile @@ -21,6 +21,7 @@ compile: --pysrvname $(SERVICE_CAPS).$(SERVICE_CAPS)Server \ --pyimplname $(SERVICE_CAPS).$(SERVICE_CAPS)Impl; + kb-sdk compile $(SPEC_FILE) \ --out . \ --html \ @@ -41,4 +42,4 @@ test-coverage: build-condor-test-image: cd test/dockerfiles/condor && echo `pwd` && docker build -f Dockerfile . -t $(CONDOR_DOCKER_IMAGE_TAG_NAME) - docker push $(CONDOR_DOCKER_IMAGE_TAG_NAME) \ No newline at end of file + docker push $(CONDOR_DOCKER_IMAGE_TAG_NAME) diff --git a/Pipfile b/Pipfile index 5de7bfa43..8d437ecec 100644 --- a/Pipfile +++ b/Pipfile @@ -15,7 +15,7 @@ cachetools = "==3.1.1" certifi = "==2019.6.16" cffi = "==1.14.0" chardet = "==3.0.4" -codecov = "==2.0.15" +codecov = "==2.0.16" configparser = "==3.7.4" confluent-kafka = "==1.5.0" coverage = "==4.5.3" diff --git a/Pipfile.lock b/Pipfile.lock index e238e5484..679d5189c 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "131a9560c753b4c2a11c262f60221ca37fbf88b824191cfd09e123bd9e63c170" + "sha256": "5453309812c28f912f0f5f329065260ada4468ec94aab8f9d76a98132d64065e" }, "pipfile-spec": 6, "requires": { @@ -151,11 +151,11 @@ }, "codecov": { "hashes": [ - "sha256:8ed8b7c6791010d359baed66f84f061bba5bd41174bf324c31311e8737602788", - "sha256:ae00d68e18d8a20e9c3288ba3875ae03db3a8e892115bf9b83ef20507732bed4" + "sha256:38b32934e759a29313382287f59986f25613708f60760c88d31e956399bbeffe", + "sha256:4cf93c30cc1ddb6d7414fce0a45816889499c3febc8bbbc24f1cd1936a804087" ], "index": "pypi", - "version": "==2.0.15" + "version": "==2.0.16" }, "configparser": { "hashes": [ @@ -573,11 +573,11 @@ }, "packaging": { "hashes": [ - "sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb", - "sha256:ef103e05f519cdc783ae24ea4e2e0f508a9c99b2d4969652eed6a2e1ea5bd522" + "sha256:714ac14496c3e68c99c29b00845f7a2b85f3bb6f1078fd9f72fd20f0570002b2", + "sha256:b6ad297f8907de0fa2fe1ccbd26fdaf387f5f47c7275fedf8cce89f99446cf97" ], - "markers": "python_version >= '3.6'", - "version": "==21.3" + "markers": "python_version >= '3.7'", + "version": "==23.0" }, "pluggy": { "hashes": [ @@ -807,11 +807,11 @@ }, "setuptools": { "hashes": [ - "sha256:68e45d17c9281ba25dc0104eadd2647172b3472d9e01f911efa57965e8d51a36", - "sha256:a43bdedf853c670e5fed28e5623403bad2f73cf02f9a2774e91def6bda8265a7" + "sha256:16ccf598aab3b506593c17378473978908a2734d7336755a8769b480906bec1c", + "sha256:b440ee5f7e607bb8c9de15259dba2583dd41a38879a7abc1d43a71c59524da48" ], "markers": "python_version >= '3.7'", - "version": "==62.3.2" + "version": "==67.2.0" }, "six": { "hashes": [ diff --git a/README.md b/README.md index 3b584efa4..36e976e63 100644 --- a/README.md +++ b/README.md @@ -67,11 +67,7 @@ pre-commit uninstall ``` ## Installing HTCondor Bindings from the mac -* You may not be able to load without disabling the mac Security Gatekeeper with `sudo spctl --master-disable` -* The HTCondor bindings only work on the Python.org install of python or your system install of python2.7. They will not work with anaconda. So download python from python.org -* Download the mac bindings at https://research.cs.wisc.edu/htcondor/tarball/current/8.9.10/release/ -* Current version is [8.9.10](https://research.cs.wisc.edu/htcondor/tarball/current/8.9.10/release/condor-8.9.10-x86_64_MacOSX-unstripped.tar.gz) -* Add /lib/python3 to PYTHONPATH. +* `conda install -c conda-forge python-htcondor` * `import htcondor` ## Test Running Options diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 855352c76..159b675bf 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,6 +1,23 @@ # execution_engine2 (ee2) release notes ========================================= +## 0.0.12 +* Forcing black to 22.1.0 to make sure that GHA doesn't suddenly fail +* Prevent jobs that never ran from submitting job execution stats + + +## 0.0.11 +* Add ability for `kbase` user to contact condor via token + +## 0.0.10 +* Fixes bug with ee2 not recording all jobs with the catalog during the process +of finishing a job +* Updates GHA with black and flake8 +* Fix flake8 and black formatting issues by formatting MANY files +* Updated docs for installing htcondor +* Update many python libs in requirements.txt + + ## 0.0.9 * Update GHA with latest actions, remove old actions * Change job defaults to result in diff --git a/execution_engine2.html b/execution_engine2.html index 14eda82c6..66e734929 100644 --- a/execution_engine2.html +++ b/execution_engine2.html @@ -1 +1 @@ -execution_engine2
moduleexecution_engine2{

/*
*@range[0,1]
*/
typedefintboolean;

/*
*A time in the format YYYY-MM-DDThh:mm:ssZ, where Z is either the
*character Z (representing the UTC timezone) or the difference
*in time to UTC in the format +/-HHMM, eg:
*2012-12-17T23:24:06-0500 (EST time)
*2013-04-03T08:56:32+0000 (UTC time)
*2013-04-03T08:56:32Z (UTC time)
*/
typedefstringtimestamp;

/*
*A job id.
*/
typedefstringjob_id;

/*
*A structure representing the Execution Engine status
*git_commit - the Git hash of the version of the module.
*version - the semantic version for the module.
*service - the name of the service.
*server_time - the current server timestamp since epoch
*
*# TODO - add some or all of the following
*reboot_mode - if 1, then in the process of rebooting
*stopping_mode - if 1, then in the process of stopping
*running_tasks_total - number of total running jobs
*running_tasks_per_user - mapping from user id to number of running jobs for that user
*tasks_in_queue - number of jobs in the queue that are not running
*/
typedefstructure{
stringgit_commit;
stringversion;
stringservice;
floatserver_time;
}
Status;

/*
*Returns the service configuration, including URL endpoints and timeouts.
*The returned values are:
*external-url - string - url of this service
*kbase-endpoint - string - url of the services endpoint for the KBase environment
*workspace-url - string - Workspace service url
*catalog-url - string - catalog service url
*shock-url - string - shock service url
*handle-url - string - handle service url
*auth-service-url - string - legacy auth service url
*auth-service-url-v2 - string - current auth service url
*auth-service-url-allow-insecure - boolean string (true or false) - whether to allow insecure requests
*scratch - string - local path to scratch directory
*executable - string - name of Job Runner executable
*docker_timeout - int - time in seconds before a job will be timed out and terminated
*initial_dir - string - initial dir for HTCondor to search for passed input/output files
*transfer_input_files - initial list of files to transfer to HTCondor for job running
*/
funcdeflist_config()returns(mapping<string,string>)authenticationoptional;

/*
*Returns the current running version of the execution_engine2 servicve as a semantic version string.
*/
funcdefver()returns(string)authenticationnone;

/*
*Simply check the status of this service to see queue details
*/
funcdefstatus()returns(Status)authenticationnone;

/*
*A workspace object reference of the form X/Y/Z, where
*X is the workspace id,
*Y is the object id,
*Z is the version.
*/
typedefstringwsref;

/*
*Narrative metadata for a job. All fields are optional.
*run_id - the Narrative-assigned ID of the job run. 1:1 with a job ID.
*token_id - the ID of the token used to run the method.
*tag - the release tag, e.g. dev/beta/release.
*cell_id - the ID of the narrative cell from which the job was run.
*/
typedefstructure{
stringrun_id;
stringtoken_id;
stringtag;
stringcell_id;
}
Meta;

/*
*Job requirements for a job. All fields are optional. To submit job requirements,
*the user must have full EE2 admin permissions. Ignored for the run concierge endpoint.
*
*request_cpus: the number of CPUs to request for the job.
*request_memory: the amount of memory, in MB, to request for the job.
*request_disk: the amount of disk space, in GB, to request for the job.
*client_group: the name of the client group on which to run the job.
*client_group_regex: Whether to treat the client group string, whether provided here,
*from the catalog, or as a default, as a regular expression when matching
*clientgroups. Default True for HTC, but the default depends on the scheduler.
*Omit to use the default.
*bill_to_user: the job will be counted against the provided user's fair share quota.
*ignore_concurrency_limits: ignore any limits on simultaneous job runs. Default false.
*scheduler_requirements: arbitrary key-value pairs to be provided to the job
*scheduler. Requires knowledge of the scheduler interface.
*debug_mode: Whether to run the job in debug mode. Default false.
*/
typedefstructure{
intrequest_cpus;
intrequst_memory;
intrequest_disk;
stringclient_group;
booleanclient_group_regex;
stringbill_to_user;
booleanignore_concurrency_limits;
mapping<string,string>scheduler_requirements;
booleandebug_mode;
}
JobRequirements;

/*
*method - the SDK method to run in module.method format, e.g.
*'KBaseTrees.construct_species_tree'
*params - the parameters to pass to the method.
*
*Optional parameters:
*app_id - the id of the Narrative application (UI) running this job (e.g.
*repo/name)
*service_ver - specific version of deployed service, last version is
*used if this parameter is not defined
*source_ws_objects - denotes the workspace objects that will serve as a
*source of data when running the SDK method. These references will
*be added to the autogenerated provenance. Must be in UPA format (e.g.
*6/90/4).
*meta - Narrative metadata to associate with the job.
*wsid - an optional workspace id to associate with the job. This is passed to the
*workspace service, which will share the job based on the permissions of
*the workspace rather than owner of the job
*parent_job_id - EE2 job id for the parent of the current job.
*For run_job and run_job_concierge, this value can be specified to denote
*the parent job of the job being created.
*Warning: No checking is done on the validity of the job ID, and the parent job
*record is not altered.
*Submitting a job with a parent ID to run_job_batch will cause an error to be
*returned.
*job_requirements: the requirements for the job. The user must have full EE2
*administration rights to use this parameter. Note that the job_requirements
*are not returned along with the rest of the job parameters when querying the EE2
*API - they are only considered when submitting a job.
*as_admin: run the job with full EE2 permissions, meaning that any supplied workspace
*IDs are not checked for accessibility and job_requirements may be supplied. The
*user must have full EE2 administration rights.
*Note that this field is not included in returned data when querying EE2.
*/
typedefstructure{
stringmethod;
stringapp_id;
list<UnspecifiedObject>params;
stringservice_ver;
list<wsref>source_ws_objects;
Metameta;
intwsid;
stringparent_job_id;
JobRequirementsjob_requirements;
booleanas_admin;
}
RunJobParams;

/*
*Start a new job.
*/
funcdefrun_job(RunJobParamsparams)returns(job_idjob_id)authenticationrequired;

/*
*Additional parameters for a batch job.
*wsid: the workspace with which to associate the parent job.
*as_admin: run the job with full EE2 permissions, meaning that any supplied workspace
*IDs are not checked for accessibility and job_requirements may be supplied. The
*user must have full EE2 administration rights.
*/
typedefstructure{
intwsid;
booleanas_admin;
}
BatchParams;

typedefstructure{
job_idbatch_id;
list<job_id>child_job_ids;
}
BatchSubmission;

typedefstructure{
job_idbatch_id;
list<job_id>child_job_ids;
booleanas_admin;
}
AbandonChildren;

/*
*Run a batch job, consisting of a parent job and one or more child jobs.
*Note that the as_admin parameters in the list of child jobs are ignored -
*only the as_admin parameter in the batch_params is considered.
*/
funcdefrun_job_batch(list<RunJobParams>params,BatchParamsbatch_params)returns(BatchSubmissionjob_ids)authenticationrequired;

/*
*job_id of retried job
*retry_id: job_id of the job that was launched
*str error: reason as to why that particular retry failed (available for bulk retry only)
*/
typedefstructure{
job_idjob_id;
job_idretry_id;
stringerror;
}
RetryResult;

/*
*job_id of job to retry
*as_admin: retry someone elses job in your namespace
*#TODO Possibly Add JobRequirements job_requirements;
*/
typedefstructure{
job_idjob_id;
booleanas_admin;
}
RetryParams;

/*
*job_ids of job to retry
*as_admin: retry someone else's job in your namespace
*#TODO: Possibly Add list<JobRequirements> job_requirements;
*/
typedefstructure{
list<job_id>job_ids;
booleanas_admin;
}
BulkRetryParams;

/*
*#TODO write retry parent tests to ensure BOTH the parent_job_id is present, and retry_job_id is present
*#TODO Add retry child that checks the status of the child? to prevent multiple retries
*Allowed Jobs
** Regular Job with no children
** Regular job with/without parent_id that runs a kbparallel call or a run_job_batch call
*Not Allowed
** Regular Job with children (Should not be possible to create yet)
** Batch Job Parent Container (Not a job, it won't do anything, except cancel it's child jobs)
*/
funcdefretry_job(RetryParamsparams)returns(RetryResultretry_result)authenticationrequired;

/*
*Same as retry_job, but accepts multiple jobs
*/
funcdefretry_jobs(BulkRetryParamsparams)returns(list<RetryResult>retry_result)authenticationrequired;

funcdefabandon_children(AbandonChildrenparams)returns(BatchSubmissionparent_and_child_ids)authenticationrequired;

/*
*EE2Constants Concierge Params are
*request_cpus: int
*request_memory: int in MB
*request_disk: int in GB
*job_priority: int = None range from -20 to +20, with higher values meaning better priority.
*Note: job_priority is currently not implemented.
*account_group: str = None # Someone elses account
*ignore_concurrency_limits: ignore any limits on simultaneous job runs.
*Default 1 (True).
*requirements_list: list = None ['machine=worker102','color=red']
*client_group: Optional[str] = CONCIERGE_CLIENTGROUP # You can leave default or specify a clientgroup
*client_group_regex: Whether to treat the client group string, whether provided here,
*from the catalog, or as a default, as a regular expression when matching
*clientgroups. Default True for HTC, but the default depends on the scheduler.
*Omit to use the default.
*debug_mode: Whether to run the job in debug mode. Default 0 (False).
*/
typedefstructure{
intrequest_cpu;
intrequest_memory;
intrequest_disk;
intjob_priority;
stringaccount_group;
booleanignore_concurrency_limits;
list<string>requirements_list;
stringclient_group;
booleanclient_group_regex;
booleandebug_mode;
}
ConciergeParams;

funcdefrun_job_concierge(RunJobParamsparams,ConciergeParamsconcierge_params)returns(job_idjob_id)authenticationrequired;

/*
*Get job params necessary for job execution
*@optionalas_admin
*/
typedefstructure{
job_idjob_id;
booleanas_admin;
}
GetJobParams;

funcdefget_job_params(GetJobParamsparams)returns(RunJobParamsparams)authenticationrequired;

/*
*job_id - a job id
*status - the new status to set for the job.
*/
typedefstructure{
job_idjob_id;
stringstatus;
booleanas_admin;
}
UpdateJobStatusParams;

funcdefupdate_job_status(UpdateJobStatusParamsparams)returns(job_idjob_id)authenticationrequired;

/*
*line - string - a string to set for the log line.
*is_error - int - if 1, then this line should be treated as an error, default 0
*ts - int - a timestamp since epoch in milliseconds for the log line (optional)
*
*@optionalts
*/
typedefstructure{
stringline;
booleanis_error;
intts;
}
LogLine;

/*
*@successWhether or not the add operation was successful
*@line_numberthe line number of the last added log
*/
typedefstructure{
booleansuccess;
intline_number;
}
AddJobLogsResults;

typedefstructure{
job_idjob_id;
booleanas_admin;
}
AddJobLogsParams;

funcdefadd_job_logs(AddJobLogsParamsparams,list<LogLine>lines)returns(AddJobLogsResultsresults)authenticationrequired;

/*
*last_line_number - common number of lines (including those in skip_lines
*parameter), this number can be used as next skip_lines value to
*skip already loaded lines next time.
*/
typedefstructure{
list<LogLine>lines;
intlast_line_number;
intcount;
}
GetJobLogsResults;

/*
*job id - the job id
*optional skip_lines Legacy Parameter for Offset
*optional offset Number of lines to skip (in case they were already loaded before).
*optional limit optional parameter, maximum number of lines returned
*optional as_admin request read access to record normally not allowed..
*/
typedefstructure{
job_idjob_id;
intskip_lines;
intoffset;
intlimit;
booleanas_admin;
}
GetJobLogsParams;

funcdefget_job_logs(GetJobLogsParamsparams)returns(GetJobLogsResults)authenticationrequired;

/*
*Error block of JSON RPC response
*/
typedefstructure{
stringname;
intcode;
stringmessage;
stringerror;
}
JsonRpcError;

/*
*job_id - string - the id of the job to mark completed or finished with an error
*error_message - string - optional unless job is finished with an error
*error_code - int - optional unless job finished with an error
*error - JsonRpcError - optional output from SDK Job Containers
*job_output - job output if job completed successfully
*/
typedefstructure{
job_idjob_id;
stringerror_message;
interror_code;
UnspecifiedObjectjob_output;
booleanas_admin;
}
FinishJobParams;

/*
*Register results of already started job
*/
funcdeffinish_job(FinishJobParamsparams)returns()authenticationrequired;

/*
*skip_estimation: default true. If set true, job will set to running status skipping estimation step
*/
typedefstructure{
job_idjob_id;
booleanskip_estimation;
booleanas_admin;
}
StartJobParams;

funcdefstart_job(StartJobParamsparams)returns()authenticationrequired;

/*
*exclude_fields: exclude certain fields to return. default None.
*exclude_fields strings can be one of fields defined in execution_engine2.db.models.models.Job
*/
typedefstructure{
job_idjob_id;
list<string>exclude_fields;
booleanas_admin;
}
CheckJobParams;

/*
*job_id - string - id of the job
*user - string - user who started the job
*wsid - int - optional id of the workspace where the job is bound
*authstrat - string - what strategy used to authenticate the job
*job_input - object - inputs to the job (from the run_job call) ## TODO - verify
*job_output - object - outputs from the job (from the run_job call) ## TODO - verify
*updated - int - timestamp since epoch in milliseconds of the last time the status was updated
*running - int - timestamp since epoch in milliseconds of when it entered the running state
*created - int - timestamp since epoch in milliseconds when the job was created
*finished - int - timestamp since epoch in milliseconds when the job was finished
*status - string - status of the job. one of the following:
*created - job has been created in the service
*estimating - an estimation job is running to estimate resources required for the main
*job, and which queue should be used
*queued - job is queued to be run
*running - job is running on a worker node
*completed - job was completed successfully
*error - job is no longer running, but failed with an error
*terminated - job is no longer running, terminated either due to user cancellation,
*admin cancellation, or some automated task
*error_code - int - internal reason why the job is an error. one of the following:
*0 - unknown
*1 - job crashed
*2 - job terminated by automation
*3 - job ran over time limit
*4 - job was missing its automated output document
*5 - job authentication token expired
*errormsg - string - message (e.g. stacktrace) accompanying an errored job
*error - object - the JSON-RPC error package that accompanies the error code and message
*
*#TODO, add these to the structure?
*condor_job_ads - dict - condor related job information
*
*retry_count - int - generated field based on length of retry_ids
*retry_ids - list - list of jobs that are retried based off of this job
*retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself
*
*batch_id - str - the parent of the job, if the job is a child job created via run_job_batch
*batch_job - bool - whether or not this is a batch parent container
*child_jobs - array - Only parent container should have child job ids
*
*scheduler_type - str - scheduler, such as awe or condor
*scheduler_id - str - scheduler generated id
*scheduler_estimator_id - str - id for the job spawned for estimation
*
*
*terminated_code - int - internal reason why a job was terminated, one of:
*0 - user cancellation
*1 - admin cancellation
*2 - terminated by some automatic process
*
*@optionalerror
*@optionalerror_code
*@optionalerrormsg
*@optionalterminated_code
*@optionalestimating
*@optionalrunning
*@optionalfinished
*/
typedefstructure{
job_idjob_id;
stringuser;
stringauthstrat;
intwsid;
stringstatus;
RunJobParamsjob_input;
intcreated;
intqueued;
intestimating;
intrunning;
intfinished;
intupdated;
interror_code;
stringerrormsg;
intterminated_code;
stringbatch_id;
}
JobState;

/*
*get current status of a job
*/
funcdefcheck_job(CheckJobParamsparams)returns(JobStatejob_state)authenticationrequired;

/*
*batch_jobstate - state of parent job of the batch
*child_jobstates - states of child jobs
*IDEA: ADD aggregate_states - count of all available child job states, even if they are zero
*/
typedefstructure{
JobStatebatch_jobstate;
list<JobState>child_jobstates;
}
CheckJobBatchResults;

/*
*get current status of a parent job, and it's children, if it has any.
*/
funcdefcheck_job_batch(CheckJobParamsparams)returns(CheckJobBatchResults)authenticationrequired;

/*
*job_states - states of jobs
*could be mapping<job_id, JobState> or list<JobState>
*/
typedefstructure{
list<JobState>job_states;
}
CheckJobsResults;

/*
*As in check_job, exclude_fields strings can be used to exclude fields.
*see CheckJobParams for allowed strings.
*
*return_list - optional, return list of job state if set to 1. Otherwise return a dict. Default 1.
*/
typedefstructure{
list<job_id>job_ids;
list<string>exclude_fields;
booleanreturn_list;
}
CheckJobsParams;

funcdefcheck_jobs(CheckJobsParamsparams)returns(CheckJobsResults)authenticationrequired;

/*
*Check status of all jobs in a given workspace. Only checks jobs that have been associated
*with a workspace at their creation.
*
*return_list - optional, return list of job state if set to 1. Otherwise return a dict. Default 0.
*/
typedefstructure{
stringworkspace_id;
list<string>exclude_fields;
booleanreturn_list;
booleanas_admin;
}
CheckWorkspaceJobsParams;

funcdefcheck_workspace_jobs(CheckWorkspaceJobsParamsparams)returns(CheckJobsResults)authenticationrequired;

/*
*cancel_and_sigterm
*"""
*Reasons for why the job was cancelled
*Current Default is `terminated_by_user 0` so as to not update narrative client
*terminated_by_user = 0
*terminated_by_admin = 1
*terminated_by_automation = 2
*"""
*job_id job_id
*@optionalterminated_code
*/
typedefstructure{
job_idjob_id;
intterminated_code;
booleanas_admin;
}
CancelJobParams;

/*
*Cancels a job. This results in the status becoming "terminated" with termination_code 0.
*/
funcdefcancel_job(CancelJobParamsparams)returns()authenticationrequired;

/*
*job_id - id of job running method
*finished - indicates whether job is done (including error/cancel cases) or not
*canceled - whether the job is canceled or not.
*ujs_url - url of UserAndJobState service used by job service
*/
typedefstructure{
job_idjob_id;
booleanfinished;
booleancanceled;
stringujs_url;
booleanas_admin;
}
CheckJobCanceledResult;

/*
*Check whether a job has been canceled. This method is lightweight compared to check_job.
*/
funcdefcheck_job_canceled(CancelJobParamsparams)returns(CheckJobCanceledResultresult)authenticationrequired;

typedefstructure{
stringstatus;
}
GetJobStatusResult;

typedefstructure{
job_idjob_id;
booleanas_admin;
}
GetJobStatusParams;

/*
*Just returns the status string for a job of a given id.
*/
funcdefget_job_status(GetJobStatusParamsparams)returns(GetJobStatusResultresult)authenticationrequired;

/*
*Projection Fields
*user = StringField(required=True)
*authstrat = StringField(
*required=True, default="kbaseworkspace", validation=valid_authstrat
*)
*wsid = IntField(required=False)
*status = StringField(required=True, validation=valid_status)
*updated = DateTimeField(default=datetime.datetime.utcnow, autonow=True)
*estimating = DateTimeField(default=None) # Time when job began estimating
*running = DateTimeField(default=None) # Time when job started
*# Time when job finished, errored out, or was terminated by the user/admin
*finished = DateTimeField(default=None)
*errormsg = StringField()
*msg = StringField()
*error = DynamicField()
*
*terminated_code = IntField(validation=valid_termination_code)
*error_code = IntField(validation=valid_errorcode)
*scheduler_type = StringField()
*scheduler_id = StringField()
*scheduler_estimator_id = StringField()
*job_input = EmbeddedDocumentField(JobInput, required=True)
*job_output = DynamicField()
*/*
*
*
*/*
*Results of check_jobs_date_range methods.
*
*jobs - the jobs matching the query, up to `limit` jobs.
*count - the number of jobs returned.
*query_count - the number of jobs that matched the filters.
*filter - DEPRECATED - this field may change in the future. The filters that were
*applied to the jobs.
*skip - the number of jobs that were skipped prior to beginning to return jobs.
*projection - the list of fields included in the returned job. By default all fields.
*limit - the maximum number of jobs returned.
*sort_order - the order in which the results were sorted by the job ID - + for
*ascending, - for descending.
*
*TODO: DOCUMENT THE RETURN OF STATS mapping
*/
typedefstructure{
list<JobState>jobs;
intcount;
intquery_count;
mapping<string,string>filter;
intskip;
list<string>projection;
intlimit;
stringsort_order;
}
CheckJobsDateRangeResults;

/*
*Check job for all jobs in a given date/time range for all users (Admin function)
*Notes on start_time and end_time:
*These fields are designated as floats but floats, ints, and strings are all
*accepted. Times are determined as follows:
*- if the field is a float or a string that contains a float and only a float,
*the field value is treated as seconds since the epoch.
*- if the field is an int or a string that contains an int and only an int,
*the field value is treated as milliseconds since the epoch.
*- if the field is a string not matching the criteria above, it is treated as
*a date and time. Nearly any unambigous format can be parsed.
*
*float start_time - Filter based on job creation timestamp since epoch
*float end_time - Filter based on job creation timestamp since epoch
*list<string> projection - A list of fields to include in the projection, default ALL
*See "Projection Fields" above
*list<string> filter - DEPRECATED: this field may change or be removed in the future.
*A list of simple filters to "AND" together, such as error_code=1, wsid=1234,
*terminated_code = 1
*int limit - The maximum number of records to return
*string user - The user whose job records will be returned. Optional. Default is the
*current user.
*int offset - the number of jobs to skip before returning records.
*boolean ascending - true to sort by job ID ascending, false descending.
*boolean as_admin - true to run the query as an admin; user must have admin EE2
*permissions. Required if setting `user` to something other than your own.
*TODO: this seems to have no effect
*@optionalprojection
*@optionalfilter
*@optionallimit
*@optionaluser
*@optionaloffset
*@optionalascending
*/
typedefstructure{
floatstart_time;
floatend_time;
list<string>projection;
list<string>filter;
intlimit;
stringuser;
intoffset;
booleanascending;
booleanas_admin;
}
CheckJobsDateRangeParams;

funcdefcheck_jobs_date_range_for_user(CheckJobsDateRangeParamsparams)returns(CheckJobsDateRangeResults)authenticationrequired;

funcdefcheck_jobs_date_range_for_all(CheckJobsDateRangeParamsparams)returns(CheckJobsDateRangeResults)authenticationrequired;

typedefstructure{
UnspecifiedObjectheld_job;
}
HeldJob;

/*
*Handle a held CONDOR job. You probably never want to run this, only the reaper should run it.
*/
funcdefhandle_held_job(stringcluster_id)returns(HeldJob)authenticationrequired;

/*
*Check if current user has ee2 admin rights.
*/
funcdefis_admin()returns(boolean)authenticationrequired;

/*
*str permission - One of 'r|w|x' (('read' | 'write' | 'none'))
*/
typedefstructure{
stringpermission;
}
AdminRolesResults;

/*
*Check if current user has ee2 admin rights.
*If so, return the type of rights and their roles
*/
funcdefget_admin_permission()returns(AdminRolesResults)authenticationrequired;

/*
*Get a list of clientgroups manually extracted from the config file
*/
funcdefget_client_groups()returns(list<string>client_groups)authenticationnone;
};
\ No newline at end of file +execution_engine2
moduleexecution_engine2{

/*
*@range[0,1]
*/
typedefintboolean;

/*
*A time in the format YYYY-MM-DDThh:mm:ssZ, where Z is either the
*character Z (representing the UTC timezone) or the difference
*in time to UTC in the format +/-HHMM, eg:
*2012-12-17T23:24:06-0500 (EST time)
*2013-04-03T08:56:32+0000 (UTC time)
*2013-04-03T08:56:32Z (UTC time)
*/
typedefstringtimestamp;

/*
*A job id.
*/
typedefstringjob_id;

/*
*A structure representing the Execution Engine status
*git_commit - the Git hash of the version of the module.
*version - the semantic version for the module.
*service - the name of the service.
*server_time - the current server timestamp since epoch
*
*# TODO - add some or all of the following
*reboot_mode - if 1, then in the process of rebooting
*stopping_mode - if 1, then in the process of stopping
*running_tasks_total - number of total running jobs
*running_tasks_per_user - mapping from user id to number of running jobs for that user
*tasks_in_queue - number of jobs in the queue that are not running
*/
typedefstructure{
stringgit_commit;
stringversion;
stringservice;
floatserver_time;
}
Status;

/*
*Returns the service configuration, including URL endpoints and timeouts.
*The returned values are:
*external-url - string - url of this service
*kbase-endpoint - string - url of the services endpoint for the KBase environment
*workspace-url - string - Workspace service url
*catalog-url - string - catalog service url
*shock-url - string - shock service url
*handle-url - string - handle service url
*auth-service-url - string - legacy auth service url
*auth-service-url-v2 - string - current auth service url
*auth-service-url-allow-insecure - boolean string (true or false) - whether to allow insecure requests
*scratch - string - local path to scratch directory
*executable - string - name of Job Runner executable
*docker_timeout - int - time in seconds before a job will be timed out and terminated
*initial_dir - string - initial dir for HTCondor to search for passed input/output files
*transfer_input_files - initial list of files to transfer to HTCondor for job running
*/
funcdeflist_config()returns(mapping<string,string>)authenticationoptional;

/*
*Returns the current running version of the execution_engine2 servicve as a semantic version string.
*/
funcdefver()returns(string)authenticationnone;

/*
*Simply check the status of this service to see queue details
*/
funcdefstatus()returns(Status)authenticationnone;

/*
*A workspace object reference of the form X/Y/Z, where
*X is the workspace id,
*Y is the object id,
*Z is the version.
*/
typedefstringwsref;

/*
*Narrative metadata for a job. All fields are optional.
*run_id - the Narrative-assigned ID of the job run. 1:1 with a job ID.
*token_id - the ID of the token used to run the method.
*tag - the release tag, e.g. dev/beta/release.
*cell_id - the ID of the narrative cell from which the job was run.
*/
typedefstructure{
stringrun_id;
stringtoken_id;
stringtag;
stringcell_id;
}
Meta;

/*
*Job requirements for a job. All fields are optional. To submit job requirements,
*the user must have full EE2 admin permissions. Ignored for the run concierge endpoint.
*
*request_cpus: the number of CPUs to request for the job.
*request_memory: the amount of memory, in MB, to request for the job.
*request_disk: the amount of disk space, in GB, to request for the job.
*client_group: the name of the client group on which to run the job.
*client_group_regex: Whether to treat the client group string, whether provided here,
*from the catalog, or as a default, as a regular expression when matching
*clientgroups. Default True for HTC, but the default depends on the scheduler.
*Omit to use the default.
*bill_to_user: the job will be counted against the provided user's fair share quota.
*ignore_concurrency_limits: ignore any limits on simultaneous job runs. Default false.
*scheduler_requirements: arbitrary key-value pairs to be provided to the job
*scheduler. Requires knowledge of the scheduler interface.
*debug_mode: Whether to run the job in debug mode. Default false.
*/
typedefstructure{
intrequest_cpus;
intrequst_memory;
intrequest_disk;
stringclient_group;
booleanclient_group_regex;
stringbill_to_user;
booleanignore_concurrency_limits;
mapping<string,string>scheduler_requirements;
booleandebug_mode;
}
JobRequirements;

/*
*method - the SDK method to run in module.method format, e.g.
*'KBaseTrees.construct_species_tree'
*params - the parameters to pass to the method.
*
*Optional parameters:
*app_id - the id of the Narrative application (UI) running this job (e.g.
*repo/name)
*service_ver - specific version of deployed service, last version is
*used if this parameter is not defined
*source_ws_objects - denotes the workspace objects that will serve as a
*source of data when running the SDK method. These references will
*be added to the autogenerated provenance. Must be in UPA format (e.g.
*6/90/4).
*meta - Narrative metadata to associate with the job.
*wsid - an optional workspace id to associate with the job. This is passed to the
*workspace service, which will share the job based on the permissions of
*the workspace rather than owner of the job
*parent_job_id - EE2 job id for the parent of the current job.
*For run_job and run_job_concierge, this value can be specified to denote
*the parent job of the job being created.
*Warning: No checking is done on the validity of the job ID, and the parent job
*record is not altered.
*Submitting a job with a parent ID to run_job_batch will cause an error to be
*returned.
*job_requirements: the requirements for the job. The user must have full EE2
*administration rights to use this parameter. Note that the job_requirements
*are not returned along with the rest of the job parameters when querying the EE2
*API - they are only considered when submitting a job.
*as_admin: run the job with full EE2 permissions, meaning that any supplied workspace
*IDs are not checked for accessibility and job_requirements may be supplied. The
*user must have full EE2 administration rights.
*Note that this field is not included in returned data when querying EE2.
*/
typedefstructure{
stringmethod;
stringapp_id;
list<UnspecifiedObject>params;
stringservice_ver;
list<wsref>source_ws_objects;
Metameta;
intwsid;
stringparent_job_id;
JobRequirementsjob_requirements;
booleanas_admin;
}
RunJobParams;

/*
*Start a new job.
*/
funcdefrun_job(RunJobParamsparams)returns(job_idjob_id)authenticationrequired;

/*
*Additional parameters for a batch job.
*wsid: the workspace with which to associate the parent job.
*as_admin: run the job with full EE2 permissions, meaning that any supplied workspace
*IDs are not checked for accessibility and job_requirements may be supplied. The
*user must have full EE2 administration rights.
*/
typedefstructure{
intwsid;
booleanas_admin;
}
BatchParams;

typedefstructure{
job_idbatch_id;
list<job_id>child_job_ids;
}
BatchSubmission;

typedefstructure{
job_idbatch_id;
list<job_id>child_job_ids;
booleanas_admin;
}
AbandonChildren;

/*
*Run a batch job, consisting of a parent job and one or more child jobs.
*Note that the as_admin parameters in the list of child jobs are ignored -
*only the as_admin parameter in the batch_params is considered.
*/
funcdefrun_job_batch(list<RunJobParams>params,BatchParamsbatch_params)returns(BatchSubmissionjob_ids)authenticationrequired;

/*
*job_id of retried job
*retry_id: job_id of the job that was launched
*str error: reason as to why that particular retry failed (available for bulk retry only)
*/
typedefstructure{
job_idjob_id;
job_idretry_id;
stringerror;
}
RetryResult;

/*
*job_id of job to retry
*as_admin: retry someone elses job in your namespace
*#TODO Possibly Add JobRequirements job_requirements;
*/
typedefstructure{
job_idjob_id;
booleanas_admin;
}
RetryParams;

/*
*job_ids of job to retry
*as_admin: retry someone else's job in your namespace
*#TODO: Possibly Add list<JobRequirements> job_requirements;
*/
typedefstructure{
list<job_id>job_ids;
booleanas_admin;
}
BulkRetryParams;

/*
*#TODO write retry parent tests to ensure BOTH the parent_job_id is present, and retry_job_id is present
*#TODO Add retry child that checks the status of the child? to prevent multiple retries
*Allowed Jobs
** Regular Job with no children
** Regular job with/without parent_id that runs a kbparallel call or a run_job_batch call
*Not Allowed
** Regular Job with children (Should not be possible to create yet)
** Batch Job Parent Container (Not a job, it won't do anything, except cancel it's child jobs)
*/
funcdefretry_job(RetryParamsparams)returns(RetryResultretry_result)authenticationrequired;

/*
*Same as retry_job, but accepts multiple jobs
*/
funcdefretry_jobs(BulkRetryParamsparams)returns(list<RetryResult>retry_result)authenticationrequired;

funcdefabandon_children(AbandonChildrenparams)returns(BatchSubmissionparent_and_child_ids)authenticationrequired;

/*
*EE2Constants Concierge Params are
*request_cpus: int
*request_memory: int in MB
*request_disk: int in GB
*job_priority: int = None range from -20 to +20, with higher values meaning better priority.
*Note: job_priority is currently not implemented.
*account_group: str = None # Someone elses account
*ignore_concurrency_limits: ignore any limits on simultaneous job runs.
*Default 1 (True).
*requirements_list: list = None ['machine=worker102','color=red']
*client_group: Optional[str] = CONCIERGE_CLIENTGROUP # You can leave default or specify a clientgroup
*client_group_regex: Whether to treat the client group string, whether provided here,
*from the catalog, or as a default, as a regular expression when matching
*clientgroups. Default True for HTC, but the default depends on the scheduler.
*Omit to use the default.
*debug_mode: Whether to run the job in debug mode. Default 0 (False).
*/
typedefstructure{
intrequest_cpu;
intrequest_memory;
intrequest_disk;
intjob_priority;
stringaccount_group;
booleanignore_concurrency_limits;
list<string>requirements_list;
stringclient_group;
booleanclient_group_regex;
booleandebug_mode;
}
ConciergeParams;

funcdefrun_job_concierge(RunJobParamsparams,ConciergeParamsconcierge_params)returns(job_idjob_id)authenticationrequired;

/*
*Get job params necessary for job execution
*@optionalas_admin
*/
typedefstructure{
job_idjob_id;
booleanas_admin;
}
GetJobParams;

funcdefget_job_params(GetJobParamsparams)returns(RunJobParamsparams)authenticationrequired;

/*
*job_id - a job id
*status - the new status to set for the job.
*/
typedefstructure{
job_idjob_id;
stringstatus;
booleanas_admin;
}
UpdateJobStatusParams;

funcdefupdate_job_status(UpdateJobStatusParamsparams)returns(job_idjob_id)authenticationrequired;

/*
*line - string - a string to set for the log line.
*is_error - int - if 1, then this line should be treated as an error, default 0
*ts - int - a timestamp since epoch in milliseconds for the log line (optional)
*
*@optionalts
*/
typedefstructure{
stringline;
booleanis_error;
intts;
}
LogLine;

/*
*@successWhether or not the add operation was successful
*@line_numberthe line number of the last added log
*/
typedefstructure{
booleansuccess;
intline_number;
}
AddJobLogsResults;

typedefstructure{
job_idjob_id;
booleanas_admin;
}
AddJobLogsParams;

funcdefadd_job_logs(AddJobLogsParamsparams,list<LogLine>lines)returns(AddJobLogsResultsresults)authenticationrequired;

/*
*last_line_number - common number of lines (including those in skip_lines
*parameter), this number can be used as next skip_lines value to
*skip already loaded lines next time.
*/
typedefstructure{
list<LogLine>lines;
intlast_line_number;
intcount;
}
GetJobLogsResults;

/*
*job id - the job id
*optional skip_lines Legacy Parameter for Offset
*optional offset Number of lines to skip (in case they were already loaded before).
*optional limit optional parameter, maximum number of lines returned
*optional as_admin request read access to record normally not allowed..
*/
typedefstructure{
job_idjob_id;
intskip_lines;
intoffset;
intlimit;
booleanas_admin;
}
GetJobLogsParams;

funcdefget_job_logs(GetJobLogsParamsparams)returns(GetJobLogsResults)authenticationrequired;

/*
*Error block of JSON RPC response
*/
typedefstructure{
stringname;
intcode;
stringmessage;
stringerror;
}
JsonRpcError;

/*
*job_id - string - the id of the job to mark completed or finished with an error
*error_message - string - optional unless job is finished with an error
*error_code - int - optional unless job finished with an error
*error - JsonRpcError - optional output from SDK Job Containers
*job_output - job output if job completed successfully
*/
typedefstructure{
job_idjob_id;
stringerror_message;
interror_code;
UnspecifiedObjectjob_output;
booleanas_admin;
}
FinishJobParams;

/*
*Register results of already started job
*/
funcdeffinish_job(FinishJobParamsparams)returns()authenticationrequired;

/*
*skip_estimation: default true. If set true, job will set to running status skipping estimation step
*/
typedefstructure{
job_idjob_id;
booleanskip_estimation;
booleanas_admin;
}
StartJobParams;

funcdefstart_job(StartJobParamsparams)returns()authenticationrequired;

/*
*exclude_fields: exclude certain fields to return. default None.
*exclude_fields strings can be one of fields defined in execution_engine2.db.models.models.Job
*/
typedefstructure{
job_idjob_id;
list<string>exclude_fields;
booleanas_admin;
}
CheckJobParams;

/*
*job_id - string - id of the job
*user - string - user who started the job
*wsid - int - optional id of the workspace where the job is bound
*authstrat - string - what strategy used to authenticate the job
*job_input - object - inputs to the job (from the run_job call) ## TODO - verify
*job_output - object - outputs from the job (from the run_job call) ## TODO - verify
*updated - int - timestamp since epoch in milliseconds of the last time the status was updated
*running - int - timestamp since epoch in milliseconds of when it entered the running state
*created - int - timestamp since epoch in milliseconds when the job was created
*finished - int - timestamp since epoch in milliseconds when the job was finished
*status - string - status of the job. one of the following:
*created - job has been created in the service
*estimating - an estimation job is running to estimate resources required for the main
*job, and which queue should be used
*queued - job is queued to be run
*running - job is running on a worker node
*completed - job was completed successfully
*error - job is no longer running, but failed with an error
*terminated - job is no longer running, terminated either due to user cancellation,
*admin cancellation, or some automated task
*error_code - int - internal reason why the job is an error. one of the following:
*0 - unknown
*1 - job crashed
*2 - job terminated by automation
*3 - job ran over time limit
*4 - job was missing its automated output document
*5 - job authentication token expired
*errormsg - string - message (e.g. stacktrace) accompanying an errored job
*error - object - the JSON-RPC error package that accompanies the error code and message
*
*#TODO, add these to the structure?
*condor_job_ads - dict - condor related job information
*
*retry_count - int - generated field based on length of retry_ids
*retry_ids - list - list of jobs that are retried based off of this job
*retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself
*
*batch_id - str - the coordinating job, if the job is a child job created via run_job_batch
*batch_job - bool - whether or not this is a batch parent container
*child_jobs - array - Only parent container should have child job ids
*
*scheduler_type - str - scheduler, such as awe or condor
*scheduler_id - str - scheduler generated id
*scheduler_estimator_id - str - id for the job spawned for estimation
*
*
*terminated_code - int - internal reason why a job was terminated, one of:
*0 - user cancellation
*1 - admin cancellation
*2 - terminated by some automatic process
*
*@optionalerror
*@optionalerror_code
*@optionalerrormsg
*@optionalterminated_code
*@optionalestimating
*@optionalrunning
*@optionalfinished
*/
typedefstructure{
job_idjob_id;
stringuser;
stringauthstrat;
intwsid;
stringstatus;
RunJobParamsjob_input;
intcreated;
intqueued;
intestimating;
intrunning;
intfinished;
intupdated;
interror_code;
stringerrormsg;
intterminated_code;
stringbatch_id;
}
JobState;

/*
*get current status of a job
*/
funcdefcheck_job(CheckJobParamsparams)returns(JobStatejob_state)authenticationrequired;

/*
*batch_jobstate - state of the coordinating job for the batch
*child_jobstates - states of child jobs
*IDEA: ADD aggregate_states - count of all available child job states, even if they are zero
*/
typedefstructure{
JobStatebatch_jobstate;
list<JobState>child_jobstates;
}
CheckJobBatchResults;

/*
*get current status of a parent job, and it's children, if it has any.
*/
funcdefcheck_job_batch(CheckJobParamsparams)returns(CheckJobBatchResults)authenticationrequired;

/*
*job_states - states of jobs
*could be mapping<job_id, JobState> or list<JobState>
*/
typedefstructure{
list<JobState>job_states;
}
CheckJobsResults;

/*
*As in check_job, exclude_fields strings can be used to exclude fields.
*see CheckJobParams for allowed strings.
*
*return_list - optional, return list of job state if set to 1. Otherwise return a dict. Default 1.
*/
typedefstructure{
list<job_id>job_ids;
list<string>exclude_fields;
booleanreturn_list;
}
CheckJobsParams;

funcdefcheck_jobs(CheckJobsParamsparams)returns(CheckJobsResults)authenticationrequired;

/*
*Check status of all jobs in a given workspace. Only checks jobs that have been associated
*with a workspace at their creation.
*
*return_list - optional, return list of job state if set to 1. Otherwise return a dict. Default 0.
*/
typedefstructure{
stringworkspace_id;
list<string>exclude_fields;
booleanreturn_list;
booleanas_admin;
}
CheckWorkspaceJobsParams;

funcdefcheck_workspace_jobs(CheckWorkspaceJobsParamsparams)returns(CheckJobsResults)authenticationrequired;

/*
*cancel_and_sigterm
*"""
*Reasons for why the job was cancelled
*Current Default is `terminated_by_user 0` so as to not update narrative client
*terminated_by_user = 0
*terminated_by_admin = 1
*terminated_by_automation = 2
*"""
*job_id job_id
*@optionalterminated_code
*/
typedefstructure{
job_idjob_id;
intterminated_code;
booleanas_admin;
}
CancelJobParams;

/*
*Cancels a job. This results in the status becoming "terminated" with termination_code 0.
*/
funcdefcancel_job(CancelJobParamsparams)returns()authenticationrequired;

/*
*job_id - id of job running method
*finished - indicates whether job is done (including error/cancel cases) or not
*canceled - whether the job is canceled or not.
*ujs_url - url of UserAndJobState service used by job service
*/
typedefstructure{
job_idjob_id;
booleanfinished;
booleancanceled;
stringujs_url;
booleanas_admin;
}
CheckJobCanceledResult;

/*
*Check whether a job has been canceled. This method is lightweight compared to check_job.
*/
funcdefcheck_job_canceled(CancelJobParamsparams)returns(CheckJobCanceledResultresult)authenticationrequired;

typedefstructure{
stringstatus;
}
GetJobStatusResult;

typedefstructure{
job_idjob_id;
booleanas_admin;
}
GetJobStatusParams;

/*
*Just returns the status string for a job of a given id.
*/
funcdefget_job_status(GetJobStatusParamsparams)returns(GetJobStatusResultresult)authenticationrequired;

/*
*Projection Fields
*user = StringField(required=True)
*authstrat = StringField(
*required=True, default="kbaseworkspace", validation=valid_authstrat
*)
*wsid = IntField(required=False)
*status = StringField(required=True, validation=valid_status)
*updated = DateTimeField(default=datetime.datetime.utcnow, autonow=True)
*estimating = DateTimeField(default=None) # Time when job began estimating
*running = DateTimeField(default=None) # Time when job started
*# Time when job finished, errored out, or was terminated by the user/admin
*finished = DateTimeField(default=None)
*errormsg = StringField()
*msg = StringField()
*error = DynamicField()
*
*terminated_code = IntField(validation=valid_termination_code)
*error_code = IntField(validation=valid_errorcode)
*scheduler_type = StringField()
*scheduler_id = StringField()
*scheduler_estimator_id = StringField()
*job_input = EmbeddedDocumentField(JobInput, required=True)
*job_output = DynamicField()
*/*
*
*
*/*
*Results of check_jobs_date_range methods.
*
*jobs - the jobs matching the query, up to `limit` jobs.
*count - the number of jobs returned.
*query_count - the number of jobs that matched the filters.
*filter - DEPRECATED - this field may change in the future. The filters that were
*applied to the jobs.
*skip - the number of jobs that were skipped prior to beginning to return jobs.
*projection - the list of fields included in the returned job. By default all fields.
*limit - the maximum number of jobs returned.
*sort_order - the order in which the results were sorted by the job ID - + for
*ascending, - for descending.
*
*TODO: DOCUMENT THE RETURN OF STATS mapping
*/
typedefstructure{
list<JobState>jobs;
intcount;
intquery_count;
mapping<string,string>filter;
intskip;
list<string>projection;
intlimit;
stringsort_order;
}
CheckJobsDateRangeResults;

/*
*Check job for all jobs in a given date/time range for all users (Admin function)
*Notes on start_time and end_time:
*These fields are designated as floats but floats, ints, and strings are all
*accepted. Times are determined as follows:
*- if the field is a float or a string that contains a float and only a float,
*the field value is treated as seconds since the epoch.
*- if the field is an int or a string that contains an int and only an int,
*the field value is treated as milliseconds since the epoch.
*- if the field is a string not matching the criteria above, it is treated as
*a date and time. Nearly any unambigous format can be parsed.
*
*float start_time - Filter based on job creation timestamp since epoch
*float end_time - Filter based on job creation timestamp since epoch
*list<string> projection - A list of fields to include in the projection, default ALL
*See "Projection Fields" above
*list<string> filter - DEPRECATED: this field may change or be removed in the future.
*A list of simple filters to "AND" together, such as error_code=1, wsid=1234,
*terminated_code = 1
*int limit - The maximum number of records to return
*string user - The user whose job records will be returned. Optional. Default is the
*current user.
*int offset - the number of jobs to skip before returning records.
*boolean ascending - true to sort by job ID ascending, false descending.
*boolean as_admin - true to run the query as an admin; user must have admin EE2
*permissions. Required if setting `user` to something other than your own.
*TODO: this seems to have no effect
*@optionalprojection
*@optionalfilter
*@optionallimit
*@optionaluser
*@optionaloffset
*@optionalascending
*/
typedefstructure{
floatstart_time;
floatend_time;
list<string>projection;
list<string>filter;
intlimit;
stringuser;
intoffset;
booleanascending;
booleanas_admin;
}
CheckJobsDateRangeParams;

funcdefcheck_jobs_date_range_for_user(CheckJobsDateRangeParamsparams)returns(CheckJobsDateRangeResults)authenticationrequired;

funcdefcheck_jobs_date_range_for_all(CheckJobsDateRangeParamsparams)returns(CheckJobsDateRangeResults)authenticationrequired;

typedefstructure{
UnspecifiedObjectheld_job;
}
HeldJob;

/*
*Handle a held CONDOR job. You probably never want to run this, only the reaper should run it.
*/
funcdefhandle_held_job(stringcluster_id)returns(HeldJob)authenticationrequired;

/*
*Check if current user has ee2 admin rights.
*/
funcdefis_admin()returns(boolean)authenticationrequired;

/*
*str permission - One of 'r|w|x' (('read' | 'write' | 'none'))
*/
typedefstructure{
stringpermission;
}
AdminRolesResults;

/*
*Check if current user has ee2 admin rights.
*If so, return the type of rights and their roles
*/
funcdefget_admin_permission()returns(AdminRolesResults)authenticationrequired;

/*
*Get a list of clientgroups manually extracted from the config file
*/
funcdefget_client_groups()returns(list<string>client_groups)authenticationnone;
};
\ No newline at end of file diff --git a/lib/execution_engine2/authorization/authstrategy.py b/lib/execution_engine2/authorization/authstrategy.py index 8ac55a034..5b6e8e547 100644 --- a/lib/execution_engine2/authorization/authstrategy.py +++ b/lib/execution_engine2/authorization/authstrategy.py @@ -2,7 +2,7 @@ A module with commands for checking user privileges for jobs. This doesn't include checking admin rights. """ -from typing import Dict, List +from typing import List from lib.execution_engine2.authorization.workspaceauth import WorkspaceAuth from lib.execution_engine2.db.models.models import Job from collections import defaultdict diff --git a/lib/execution_engine2/db/models/models.py b/lib/execution_engine2/db/models/models.py index 99e115412..4bff7e0c1 100644 --- a/lib/execution_engine2/db/models/models.py +++ b/lib/execution_engine2/db/models/models.py @@ -414,9 +414,9 @@ def save(self, *args, **kwargs): return super(HeldJob, self).save(*args, **kwargs) -### -### Unused fields that we might want -### +# +# Unused fields that we might want +# result_example = { "shocknodes": [], @@ -442,9 +442,9 @@ def save(self, *args, **kwargs): } -#### -#### Unused Stuff to look at -#### +# +# Unused Stuff to look at +# class Results(EmbeddedDocument): diff --git a/lib/execution_engine2/db/models/transfer/check_jobs_for_finish_times.py b/lib/execution_engine2/db/models/transfer/check_jobs_for_finish_times.py index b65576f1a..1221623b0 100644 --- a/lib/execution_engine2/db/models/transfer/check_jobs_for_finish_times.py +++ b/lib/execution_engine2/db/models/transfer/check_jobs_for_finish_times.py @@ -1,15 +1,15 @@ #!/usr/bin/env python -# type: ignore +from bson import ObjectId + try: from .transfer_ujs_njs import MigrateDatabases from lib.execution_engine2.db.models.models import Status, valid_status except Exception: from transfer_ujs_njs import MigrateDatabases - from models import * + from models import Status, valid_status ee2_jobs = MigrateDatabases().ee2_jobs -from bson import ObjectId count = 0 for job in ee2_jobs.find(): diff --git a/lib/execution_engine2/db/models/transfer/check_njs_jobs_for_ujs_id.py b/lib/execution_engine2/db/models/transfer/check_njs_jobs_for_ujs_id.py index 59460526f..3d8690f47 100644 --- a/lib/execution_engine2/db/models/transfer/check_njs_jobs_for_ujs_id.py +++ b/lib/execution_engine2/db/models/transfer/check_njs_jobs_for_ujs_id.py @@ -1,20 +1,17 @@ #!/usr/bin/env python # type: ignore +from collections import defaultdict + +from bson import ObjectId + try: from .transfer_ujs_njs import MigrateDatabases - from lib.execution_engine2.db.models.models import Status, valid_status except Exception: from transfer_ujs_njs import MigrateDatabases - from models import * -from bson import ObjectId - -from pprint import pprint njs_jobs_db = MigrateDatabases().njs_jobs ujs_jobs_db = MigrateDatabases().ujs_jobs -from collections import defaultdict - count = 0 missing_ujs = [] c = defaultdict(int) @@ -27,7 +24,6 @@ print(f"Couldn't find {job_id}, ") missing_ujs.append(job_id) - print("Max occurences", max(c.values())) print("Number of njs jobs", count) diff --git a/lib/execution_engine2/db/models/transfer/fix_transfer_ujs_njs.py b/lib/execution_engine2/db/models/transfer/fix_transfer_ujs_njs.py index cd88a86bd..1c2b46015 100644 --- a/lib/execution_engine2/db/models/transfer/fix_transfer_ujs_njs.py +++ b/lib/execution_engine2/db/models/transfer/fix_transfer_ujs_njs.py @@ -1,19 +1,17 @@ #!/usr/bin/env python -# type: ignore import os from collections import Counter from configparser import ConfigParser +from datetime import datetime -jobs_database_name = "ee2_jobs" from mongoengine import connect -from datetime import datetime - try: - from lib.execution_engine2.db.models.models import Job, Status, JobInput - + from lib.execution_engine2.db.models.models import Job, Status except Exception: - from models import Status, Job, JobInput + from models import Status, Job + +jobs_database_name = "ee2_jobs" class FixEE2JobsDatabase: diff --git a/lib/execution_engine2/execution_engine2Impl.py b/lib/execution_engine2/execution_engine2Impl.py index 15005b031..eed19ed7a 100644 --- a/lib/execution_engine2/execution_engine2Impl.py +++ b/lib/execution_engine2/execution_engine2Impl.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -#BEGIN_HEADER +# BEGIN_HEADER import os import time @@ -10,7 +10,7 @@ from execution_engine2.utils.clients import get_client_set _AS_ADMIN = "as_admin" -#END_HEADER +# END_HEADER class execution_engine2: @@ -29,10 +29,10 @@ class execution_engine2: # the latter method is running. ######################################### noqa VERSION = "0.0.8" - GIT_URL = "https://github.com/mrcreosote/execution_engine2.git" - GIT_COMMIT_HASH = "2ad95ce47caa4f1e7b939651f2b1773840e67a8a" + GIT_URL = "" + GIT_COMMIT_HASH = "" - #BEGIN_CLASS_HEADER + # BEGIN_CLASS_HEADER MONGO_COLLECTION = "jobs" MONGO_AUTHMECHANISM = "DEFAULT" @@ -43,12 +43,12 @@ class execution_engine2: ADMIN_ROLES_CACHE_SIZE = 500 ADMIN_ROLES_CACHE_EXPIRE_TIME = 300 # seconds - #END_CLASS_HEADER + # END_CLASS_HEADER # config contains contents of config file in a hash or None if it couldn't # be found def __init__(self, config): - #BEGIN_CONSTRUCTOR + # BEGIN_CONSTRUCTOR self.config = config self.config["mongo-collection"] = self.MONGO_COLLECTION self.config.setdefault("mongo-authmechanism", self.MONGO_AUTHMECHANISM) @@ -66,7 +66,7 @@ def __init__(self, config): override = os.environ.get("OVERRIDE_CLIENT_GROUP") with open(configpath) as cf: self.clients = get_client_set(config, cf, override) - #END_CONSTRUCTOR + # END_CONSTRUCTOR pass @@ -92,7 +92,7 @@ def list_config(self, ctx): """ # ctx is the context object # return variables are: returnVal - #BEGIN list_config + # BEGIN list_config public_keys = [ "external-url", "kbase-endpoint", @@ -113,12 +113,13 @@ def list_config(self, ctx): returnVal = {key: self.config.get(key) for key in public_keys} - #END list_config + # END list_config # At some point might do deeper type checking... if not isinstance(returnVal, dict): - raise ValueError('Method list_config return value ' + - 'returnVal is not type dict as required.') + raise ValueError('Method list_config ' + + 'return value returnVal ' + + 'is not type dict as required.') # return the results return [returnVal] @@ -129,14 +130,15 @@ def ver(self, ctx): """ # ctx is the context object # return variables are: returnVal - #BEGIN ver + # BEGIN ver returnVal = self.VERSION - #END ver + # END ver # At some point might do deeper type checking... if not isinstance(returnVal, str): - raise ValueError('Method ver return value ' + - 'returnVal is not type str as required.') + raise ValueError('Method ver ' + + 'return value returnVal ' + + 'is not type str as required.') # return the results return [returnVal] @@ -159,7 +161,7 @@ def status(self, ctx): """ # ctx is the context object # return variables are: returnVal - #BEGIN status + # BEGIN status returnVal = { "server_time": time.time(), "git_commit": self.GIT_COMMIT_HASH, @@ -167,12 +169,13 @@ def status(self, ctx): "service": self.SERVICE_NAME, } - #END status + # END status # At some point might do deeper type checking... if not isinstance(returnVal, dict): - raise ValueError('Method status return value ' + - 'returnVal is not type dict as required.') + raise ValueError('Method status ' + + 'return value returnVal ' + + 'is not type dict as required.') # return the results return [returnVal] @@ -252,7 +255,7 @@ def run_job(self, ctx, params): """ # ctx is the context object # return variables are: job_id - #BEGIN run_job + # BEGIN run_job mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients = self.clients, @@ -260,12 +263,13 @@ def run_job(self, ctx, params): admin_permissions_cache=self.admin_permissions_cache, ) job_id = mr.run_job(params, as_admin=bool(params.get(_AS_ADMIN))) - #END run_job + # END run_job # At some point might do deeper type checking... if not isinstance(job_id, str): - raise ValueError('Method run_job return value ' + - 'job_id is not type str as required.') + raise ValueError('Method run_job ' + + 'return value job_id ' + + 'is not type str as required.') # return the results return [job_id] @@ -357,7 +361,7 @@ def run_job_batch(self, ctx, params, batch_params): """ # ctx is the context object # return variables are: job_ids - #BEGIN run_job_batch + # BEGIN run_job_batch mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients = self.clients, @@ -366,12 +370,13 @@ def run_job_batch(self, ctx, params, batch_params): ) job_ids = mr.run_job_batch( params, batch_params, as_admin=bool(batch_params.get(_AS_ADMIN))) - #END run_job_batch + # END run_job_batch # At some point might do deeper type checking... if not isinstance(job_ids, dict): - raise ValueError('Method run_job_batch return value ' + - 'job_ids is not type dict as required.') + raise ValueError('Method run_job_batch ' + + 'return value job_ids ' + + 'is not type dict as required.') # return the results return [job_ids] @@ -399,7 +404,7 @@ def retry_job(self, ctx, params): """ # ctx is the context object # return variables are: retry_result - #BEGIN retry_job + # BEGIN retry_job mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients = self.clients, @@ -407,12 +412,13 @@ def retry_job(self, ctx, params): admin_permissions_cache=self.admin_permissions_cache ) retry_result = mr.retry(job_id=params.get('job_id'), as_admin=params.get('as_admin')) - #END retry_job + # END retry_job # At some point might do deeper type checking... if not isinstance(retry_result, dict): - raise ValueError('Method retry_job return value ' + - 'retry_result is not type dict as required.') + raise ValueError('Method retry_job ' + + 'return value retry_result ' + + 'is not type dict as required.') # return the results return [retry_result] @@ -433,7 +439,7 @@ def retry_jobs(self, ctx, params): """ # ctx is the context object # return variables are: retry_result - #BEGIN retry_jobs + # BEGIN retry_jobs mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients = self.clients, @@ -441,12 +447,13 @@ def retry_jobs(self, ctx, params): admin_permissions_cache=self.admin_permissions_cache ) retry_result = mr.retry_multiple(job_ids=params.get('job_ids'), as_admin=params.get('as_admin')) - #END retry_jobs + # END retry_jobs # At some point might do deeper type checking... if not isinstance(retry_result, list): - raise ValueError('Method retry_jobs return value ' + - 'retry_result is not type list as required.') + raise ValueError('Method retry_jobs ' + + 'return value retry_result ' + + 'is not type list as required.') # return the results return [retry_result] @@ -462,7 +469,7 @@ def abandon_children(self, ctx, params): """ # ctx is the context object # return variables are: parent_and_child_ids - #BEGIN abandon_children + # BEGIN abandon_children mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, @@ -472,12 +479,13 @@ def abandon_children(self, ctx, params): parent_and_child_ids = mr.abandon_children(batch_id=params['batch_id'], child_job_ids=params['child_job_ids'], as_admin=params.get('as_admin')) - #END abandon_children + # END abandon_children # At some point might do deeper type checking... if not isinstance(parent_and_child_ids, dict): - raise ValueError('Method abandon_children return value ' + - 'parent_and_child_ids is not type dict as required.') + raise ValueError('Method abandon_children ' + + 'return value parent_and_child_ids ' + + 'is not type dict as required.') # return the results return [parent_and_child_ids] @@ -579,18 +587,19 @@ def run_job_concierge(self, ctx, params, concierge_params): """ # ctx is the context object # return variables are: job_id - #BEGIN run_job_concierge + # BEGIN run_job_concierge mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, ) job_id = mr.run_job_concierge(params=params,concierge_params=concierge_params) - #END run_job_concierge + # END run_job_concierge # At some point might do deeper type checking... if not isinstance(job_id, str): - raise ValueError('Method run_job_concierge return value ' + - 'job_id is not type str as required.') + raise ValueError('Method run_job_concierge ' + + 'return value job_id ' + + 'is not type str as required.') # return the results return [job_id] @@ -672,7 +681,7 @@ def get_job_params(self, ctx, params): """ # ctx is the context object # return variables are: params - #BEGIN get_job_params + # BEGIN get_job_params mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, @@ -680,12 +689,13 @@ def get_job_params(self, ctx, params): admin_permissions_cache=self.admin_permissions_cache, ) params = mr.get_job_params(job_id=params['job_id'], as_admin=params.get('as_admin')) - #END get_job_params + # END get_job_params # At some point might do deeper type checking... if not isinstance(params, dict): - raise ValueError('Method get_job_params return value ' + - 'params is not type dict as required.') + raise ValueError('Method get_job_params ' + + 'return value params ' + + 'is not type dict as required.') # return the results return [params] @@ -700,7 +710,7 @@ def update_job_status(self, ctx, params): """ # ctx is the context object # return variables are: job_id - #BEGIN update_job_status + # BEGIN update_job_status mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, @@ -710,12 +720,13 @@ def update_job_status(self, ctx, params): ) job_id = mr.update_job_status(job_id=params['job_id'], status=params['status'], as_admin=params.get('as_admin')) - #END update_job_status + # END update_job_status # At some point might do deeper type checking... if not isinstance(job_id, str): - raise ValueError('Method update_job_status return value ' + - 'job_id is not type str as required.') + raise ValueError('Method update_job_status ' + + 'return value job_id ' + + 'is not type str as required.') # return the results return [job_id] @@ -737,7 +748,7 @@ def add_job_logs(self, ctx, params, lines): """ # ctx is the context object # return variables are: results - #BEGIN add_job_logs + # BEGIN add_job_logs mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, @@ -750,12 +761,13 @@ def add_job_logs(self, ctx, params, lines): results = {'success': add_job_logs.success, 'line_number': add_job_logs.stored_line_count} - #END add_job_logs + # END add_job_logs # At some point might do deeper type checking... if not isinstance(results, dict): - raise ValueError('Method add_job_logs return value ' + - 'results is not type dict as required.') + raise ValueError('Method add_job_logs ' + + 'return value results ' + + 'is not type dict as required.') # return the results return [results] @@ -784,7 +796,7 @@ def get_job_logs(self, ctx, params): """ # ctx is the context object # return variables are: returnVal - #BEGIN get_job_logs + # BEGIN get_job_logs if params.get("skip_lines") and params.get("offset"): raise ValueError("Please provide only one of skip_lines or offset") @@ -800,12 +812,13 @@ def get_job_logs(self, ctx, params): limit=params.get("limit", None), as_admin=params.get('as_admin') ) - #END get_job_logs + # END get_job_logs # At some point might do deeper type checking... if not isinstance(returnVal, dict): - raise ValueError('Method get_job_logs return value ' + - 'returnVal is not type dict as required.') + raise ValueError('Method get_job_logs ' + + 'return value returnVal ' + + 'is not type dict as required.') # return the results return [returnVal] @@ -827,7 +840,7 @@ def finish_job(self, ctx, params): parameter "as_admin" of type "boolean" (@range [0,1]) """ # ctx is the context object - #BEGIN finish_job + # BEGIN finish_job mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, @@ -843,7 +856,7 @@ def finish_job(self, ctx, params): as_admin=params.get('as_admin') ) - #END finish_job + # END finish_job pass def start_job(self, ctx, params): @@ -855,7 +868,7 @@ def start_job(self, ctx, params): [0,1]), parameter "as_admin" of type "boolean" (@range [0,1]) """ # ctx is the context object - #BEGIN start_job + # BEGIN start_job mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, @@ -866,7 +879,7 @@ def start_job(self, ctx, params): params["job_id"], skip_estimation=params.get("skip_estimation", True), as_admin=params.get('as_admin') ) - #END start_job + # END start_job pass def check_job(self, ctx, params): @@ -911,7 +924,7 @@ def check_job(self, ctx, params): retry_ids - list - list of jobs that are retried based off of this job retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself batch_id - str - - the parent of the job, if the job is a child job created via + the coordinating job, if the job is a child job created via run_job_batch batch_job - bool - whether or not this is a batch parent container child_jobs - array - Only parent container should have child job ids scheduler_type - str - scheduler, such as awe @@ -1004,7 +1017,7 @@ def check_job(self, ctx, params): """ # ctx is the context object # return variables are: job_state - #BEGIN check_job + # BEGIN check_job mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, @@ -1013,12 +1026,13 @@ def check_job(self, ctx, params): params["job_id"], exclude_fields=params.get("exclude_fields", None), as_admin=params.get('as_admin') ) - #END check_job + # END check_job # At some point might do deeper type checking... if not isinstance(job_state, dict): - raise ValueError('Method check_job return value ' + - 'job_state is not type dict as required.') + raise ValueError('Method check_job ' + + 'return value job_state ' + + 'is not type dict as required.') # return the results return [job_state] @@ -1033,19 +1047,19 @@ def check_job_batch(self, ctx, params): of list of String, parameter "as_admin" of type "boolean" (@range [0,1]) :returns: instance of type "CheckJobBatchResults" (batch_jobstate - - state of parent job of the batch child_jobstates - states of child - jobs IDEA: ADD aggregate_states - count of all available child job - states, even if they are zero) -> structure: parameter - "batch_jobstate" of type "JobState" (job_id - string - id of the - job user - string - user who started the job wsid - int - optional - id of the workspace where the job is bound authstrat - string - - what strategy used to authenticate the job job_input - object - - inputs to the job (from the run_job call) ## TODO - verify - job_output - object - outputs from the job (from the run_job call) - ## TODO - verify updated - int - timestamp since epoch in - milliseconds of the last time the status was updated running - int - - timestamp since epoch in milliseconds of when it entered the - running state created - int - timestamp since epoch in + state of the coordinating job for the batch child_jobstates - + states of child jobs IDEA: ADD aggregate_states - count of all + available child job states, even if they are zero) -> structure: + parameter "batch_jobstate" of type "JobState" (job_id - string - + id of the job user - string - user who started the job wsid - int + - optional id of the workspace where the job is bound authstrat - + string - what strategy used to authenticate the job job_input - + object - inputs to the job (from the run_job call) ## TODO - + verify job_output - object - outputs from the job (from the + run_job call) ## TODO - verify updated - int - timestamp since + epoch in milliseconds of the last time the status was updated + running - int - timestamp since epoch in milliseconds of when it + entered the running state created - int - timestamp since epoch in milliseconds when the job was created finished - int - timestamp since epoch in milliseconds when the job was finished status - string - status of the job. one of the following: created - job @@ -1068,7 +1082,7 @@ def check_job_batch(self, ctx, params): retry_ids - list - list of jobs that are retried based off of this job retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself batch_id - str - - the parent of the job, if the job is a child job created via + the coordinating job, if the job is a child job created via run_job_batch batch_job - bool - whether or not this is a batch parent container child_jobs - array - Only parent container should have child job ids scheduler_type - str - scheduler, such as awe @@ -1189,7 +1203,7 @@ def check_job_batch(self, ctx, params): retry_ids - list - list of jobs that are retried based off of this job retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself batch_id - str - - the parent of the job, if the job is a child job created via + the coordinating job, if the job is a child job created via run_job_batch batch_job - bool - whether or not this is a batch parent container child_jobs - array - Only parent container should have child job ids scheduler_type - str - scheduler, such as awe @@ -1282,7 +1296,7 @@ def check_job_batch(self, ctx, params): """ # ctx is the context object # return variables are: returnVal - #BEGIN check_job_batch + # BEGIN check_job_batch mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, @@ -1291,12 +1305,13 @@ def check_job_batch(self, ctx, params): batch_id=params["job_id"], exclude_fields=params.get("exclude_fields", None), as_admin=params.get('as_admin') ) - #END check_job_batch + # END check_job_batch # At some point might do deeper type checking... if not isinstance(returnVal, dict): - raise ValueError('Method check_job_batch return value ' + - 'returnVal is not type dict as required.') + raise ValueError('Method check_job_batch ' + + 'return value returnVal ' + + 'is not type dict as required.') # return the results return [returnVal] @@ -1343,7 +1358,7 @@ def check_jobs(self, ctx, params): retry_ids - list - list of jobs that are retried based off of this job retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself batch_id - str - - the parent of the job, if the job is a child job created via + the coordinating job, if the job is a child job created via run_job_batch batch_job - bool - whether or not this is a batch parent container child_jobs - array - Only parent container should have child job ids scheduler_type - str - scheduler, such as awe @@ -1436,7 +1451,7 @@ def check_jobs(self, ctx, params): """ # ctx is the context object # return variables are: returnVal - #BEGIN check_jobs + # BEGIN check_jobs mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, @@ -1447,12 +1462,13 @@ def check_jobs(self, ctx, params): return_list=params.get("return_list", 1), as_admin=params.get('as_admin') ) - #END check_jobs + # END check_jobs # At some point might do deeper type checking... if not isinstance(returnVal, dict): - raise ValueError('Method check_jobs return value ' + - 'returnVal is not type dict as required.') + raise ValueError('Method check_jobs ' + + 'return value returnVal ' + + 'is not type dict as required.') # return the results return [returnVal] @@ -1500,7 +1516,7 @@ def check_workspace_jobs(self, ctx, params): retry_ids - list - list of jobs that are retried based off of this job retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself batch_id - str - - the parent of the job, if the job is a child job created via + the coordinating job, if the job is a child job created via run_job_batch batch_job - bool - whether or not this is a batch parent container child_jobs - array - Only parent container should have child job ids scheduler_type - str - scheduler, such as awe @@ -1593,7 +1609,7 @@ def check_workspace_jobs(self, ctx, params): """ # ctx is the context object # return variables are: returnVal - #BEGIN check_workspace_jobs + # BEGIN check_workspace_jobs mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, @@ -1604,12 +1620,13 @@ def check_workspace_jobs(self, ctx, params): return_list=params.get("return_list", 1), as_admin=params.get('as_admin') ) - #END check_workspace_jobs + # END check_workspace_jobs # At some point might do deeper type checking... if not isinstance(returnVal, dict): - raise ValueError('Method check_workspace_jobs return value ' + - 'returnVal is not type dict as required.') + raise ValueError('Method check_workspace_jobs ' + + 'return value returnVal ' + + 'is not type dict as required.') # return the results return [returnVal] @@ -1626,7 +1643,7 @@ def cancel_job(self, ctx, params): "as_admin" of type "boolean" (@range [0,1]) """ # ctx is the context object - #BEGIN cancel_job + # BEGIN cancel_job mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, @@ -1638,7 +1655,7 @@ def cancel_job(self, ctx, params): job_id=params["job_id"], terminated_code=params.get("terminated_code"), as_admin=params.get('as_admin') ) - #END cancel_job + # END cancel_job pass def check_job_canceled(self, ctx, params): @@ -1664,18 +1681,19 @@ def check_job_canceled(self, ctx, params): """ # ctx is the context object # return variables are: result - #BEGIN check_job_canceled + # BEGIN check_job_canceled mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, ) result = mr.check_job_canceled(job_id=params["job_id"], as_admin=params.get('as_admin')) - #END check_job_canceled + # END check_job_canceled # At some point might do deeper type checking... if not isinstance(result, dict): - raise ValueError('Method check_job_canceled return value ' + - 'result is not type dict as required.') + raise ValueError('Method check_job_canceled ' + + 'return value result ' + + 'is not type dict as required.') # return the results return [result] @@ -1690,7 +1708,7 @@ def get_job_status(self, ctx, params): """ # ctx is the context object # return variables are: result - #BEGIN get_job_status + # BEGIN get_job_status mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, @@ -1698,12 +1716,13 @@ def get_job_status(self, ctx, params): admin_permissions_cache=self.admin_permissions_cache, ) result = mr.get_job_status_field(job_id=params['job_id'], as_admin=params.get('as_admin')) - #END get_job_status + # END get_job_status # At some point might do deeper type checking... if not isinstance(result, dict): - raise ValueError('Method get_job_status return value ' + - 'result is not type dict as required.') + raise ValueError('Method get_job_status ' + + 'return value result ' + + 'is not type dict as required.') # return the results return [result] @@ -1801,7 +1820,7 @@ def check_jobs_date_range_for_user(self, ctx, params): retry_ids - list - list of jobs that are retried based off of this job retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself batch_id - str - - the parent of the job, if the job is a child job created via + the coordinating job, if the job is a child job created via run_job_batch batch_job - bool - whether or not this is a batch parent container child_jobs - array - Only parent container should have child job ids scheduler_type - str - scheduler, such as awe @@ -1897,7 +1916,7 @@ def check_jobs_date_range_for_user(self, ctx, params): """ # ctx is the context object # return variables are: returnVal - #BEGIN check_jobs_date_range_for_user + # BEGIN check_jobs_date_range_for_user mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, @@ -1913,12 +1932,13 @@ def check_jobs_date_range_for_user(self, ctx, params): ascending=params.get("ascending"), as_admin=params.get('as_admin') ) - #END check_jobs_date_range_for_user + # END check_jobs_date_range_for_user # At some point might do deeper type checking... if not isinstance(returnVal, dict): - raise ValueError('Method check_jobs_date_range_for_user return value ' + - 'returnVal is not type dict as required.') + raise ValueError('Method check_jobs_date_range_for_user ' + + 'return value returnVal ' + + 'is not type dict as required.') # return the results return [returnVal] @@ -2016,7 +2036,7 @@ def check_jobs_date_range_for_all(self, ctx, params): retry_ids - list - list of jobs that are retried based off of this job retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself batch_id - str - - the parent of the job, if the job is a child job created via + the coordinating job, if the job is a child job created via run_job_batch batch_job - bool - whether or not this is a batch parent container child_jobs - array - Only parent container should have child job ids scheduler_type - str - scheduler, such as awe @@ -2112,7 +2132,7 @@ def check_jobs_date_range_for_all(self, ctx, params): """ # ctx is the context object # return variables are: returnVal - #BEGIN check_jobs_date_range_for_all + # BEGIN check_jobs_date_range_for_all mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, @@ -2128,12 +2148,13 @@ def check_jobs_date_range_for_all(self, ctx, params): as_admin=params.get('as_admin'), user="ALL", ) - #END check_jobs_date_range_for_all + # END check_jobs_date_range_for_all # At some point might do deeper type checking... if not isinstance(returnVal, dict): - raise ValueError('Method check_jobs_date_range_for_all return value ' + - 'returnVal is not type dict as required.') + raise ValueError('Method check_jobs_date_range_for_all ' + + 'return value returnVal ' + + 'is not type dict as required.') # return the results return [returnVal] @@ -2146,18 +2167,19 @@ def handle_held_job(self, ctx, cluster_id): """ # ctx is the context object # return variables are: returnVal - #BEGIN handle_held_job + # BEGIN handle_held_job mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, ) returnVal = mr.handle_held_job(cluster_id=cluster_id) - #END handle_held_job + # END handle_held_job # At some point might do deeper type checking... if not isinstance(returnVal, dict): - raise ValueError('Method handle_held_job return value ' + - 'returnVal is not type dict as required.') + raise ValueError('Method handle_held_job ' + + 'return value returnVal ' + + 'is not type dict as required.') # return the results return [returnVal] @@ -2168,18 +2190,19 @@ def is_admin(self, ctx): """ # ctx is the context object # return variables are: returnVal - #BEGIN is_admin + # BEGIN is_admin mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, ) returnVal = mr.check_is_admin() - #END is_admin + # END is_admin # At some point might do deeper type checking... if not isinstance(returnVal, int): - raise ValueError('Method is_admin return value ' + - 'returnVal is not type int as required.') + raise ValueError('Method is_admin ' + + 'return value returnVal ' + + 'is not type int as required.') # return the results return [returnVal] @@ -2193,18 +2216,19 @@ def get_admin_permission(self, ctx): """ # ctx is the context object # return variables are: returnVal - #BEGIN get_admin_permission + # BEGIN get_admin_permission mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, ) returnVal = mr.get_admin_permission() - #END get_admin_permission + # END get_admin_permission # At some point might do deeper type checking... if not isinstance(returnVal, dict): - raise ValueError('Method get_admin_permission return value ' + - 'returnVal is not type dict as required.') + raise ValueError('Method get_admin_permission ' + + 'return value returnVal ' + + 'is not type dict as required.') # return the results return [returnVal] @@ -2215,15 +2239,16 @@ def get_client_groups(self, ctx): """ # ctx is the context object # return variables are: client_groups - #BEGIN get_client_groups + # BEGIN get_client_groups # TODO I think this needs to be actually extracted from the config file client_groups = ['njs', 'bigmem', 'bigmemlong', 'extreme', 'concierge', 'hpc', 'kb_upload', 'terabyte', 'multi_tb', 'kb_upload_bulk'] - #END get_client_groups + # END get_client_groups # At some point might do deeper type checking... if not isinstance(client_groups, list): - raise ValueError('Method get_client_groups return value ' + - 'client_groups is not type list as required.') + raise ValueError('Method get_client_groups ' + + 'return value client_groups ' + + 'is not type list as required.') # return the results return [client_groups] diff --git a/lib/execution_engine2/execution_engine2Server.py b/lib/execution_engine2/execution_engine2Server.py index b63fe2210..8f91c8ad8 100644 --- a/lib/execution_engine2/execution_engine2Server.py +++ b/lib/execution_engine2/execution_engine2Server.py @@ -22,7 +22,11 @@ from jsonrpcbase import ServerError as JSONServerError from biokbase import log -from execution_engine2.authclient import KBaseAuth as _KBaseAuth + +try: + from execution_engine2.authclient import KBaseAuth as _KBaseAuth +except ImportError: + from installed_clients.authclient import KBaseAuth as _KBaseAuth try: from ConfigParser import ConfigParser @@ -143,12 +147,6 @@ def call_py(self, ctx, jsondata): debugging purposes. """ rdata = jsondata - # we already deserialize the json string earlier in the server code, no - # need to do it again - # try: - # rdata = json.loads(jsondata) - # except ValueError: - # raise ParseError # set some default values for error handling request = self._get_default_vals() @@ -869,7 +867,3 @@ def process_async_cli(input_file_path, output_file_path, token): assert False, "unhandled option" start_server(host=host, port=port) -# print("Listening on port %s" % port) -# httpd = make_server( host, port, application) -# -# httpd.serve_forever() diff --git a/lib/execution_engine2/sdk/EE2Runjob.py b/lib/execution_engine2/sdk/EE2Runjob.py index 2beed9a7a..5a7a8a635 100644 --- a/lib/execution_engine2/sdk/EE2Runjob.py +++ b/lib/execution_engine2/sdk/EE2Runjob.py @@ -46,6 +46,10 @@ DEBUG_MODE, ) from execution_engine2.utils.job_requirements_resolver import RequirementsType +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from lib.execution_engine2.sdk.SDKMethodRunner import SDKMethodRunner # noqa: F401 _JOB_REQUIREMENTS = "job_reqs" _JOB_REQUIREMENTS_INCOMING = "job_requirements" @@ -75,12 +79,6 @@ class PreparedJobParams(NamedTuple): job_id: str -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from lib.execution_engine2.sdk.SDKMethodRunner import SDKMethodRunner - - class EE2RunJob: def __init__(self, sdkmr): self.sdkmr = sdkmr # type: SDKMethodRunner diff --git a/lib/execution_engine2/sdk/EE2Status.py b/lib/execution_engine2/sdk/EE2Status.py index 053cfb77d..9d0a1f45e 100644 --- a/lib/execution_engine2/sdk/EE2Status.py +++ b/lib/execution_engine2/sdk/EE2Status.py @@ -10,6 +10,7 @@ ChildrenNotFoundError, ) from execution_engine2.sdk.EE2Constants import JobError +from execution_engine2.utils.arg_processing import parse_bool from lib.execution_engine2.authorization.authstrategy import can_read_jobs from lib.execution_engine2.db.models.models import ( Job, @@ -18,7 +19,6 @@ ErrorCode, TerminatedCode, ) -from execution_engine2.utils.arg_processing import parse_bool from lib.execution_engine2.utils.KafkaUtils import ( KafkaCancelJob, KafkaCondorCommand, @@ -367,8 +367,11 @@ def finish_job( error_message=None, ) ) + + # Only send jobs to catalog that actually ran on a worker + if job.running and job.running >= job.id.generation_time.timestamp(): self._send_exec_stats_to_catalog(job_id=job_id) - self._update_finished_job_with_usage(job_id, as_admin=as_admin) + self._update_finished_job_with_usage(job_id, as_admin=as_admin) def _update_finished_job_with_usage(self, job_id, as_admin=None) -> Dict: """ @@ -532,8 +535,12 @@ def check_workspace_jobs(self, workspace_id, exclude_fields=None, return_list=No return job_states def _send_exec_stats_to_catalog(self, job_id): - job = self.sdkmr.get_mongo_util().get_job(job_id) + # Some notes about app_ids in general + # Batch apps containers have an app_id of "batch_app" + # Download apps do not have an "app_id" or have it in the format of "module_id.app_name" + # Jobs launched directly via EE2 client directly should not specify an "app_id" + job = self.sdkmr.get_mongo_util().get_job(job_id) job_input = job.job_input log_exec_stats_params = dict() @@ -545,7 +552,8 @@ def _send_exec_stats_to_catalog(self, job_id): # notably the narrative data download code, maybe more # It's been this way for a long time, so leave for now log_exec_stats_params["app_module_name"] = app_id.split("/")[0] - log_exec_stats_params["app_id"] = app_id + log_exec_stats_params["app_id"] = app_id.split("/")[-1] + method = job_input.method log_exec_stats_params["func_module_name"] = method.split(".")[0] log_exec_stats_params["func_name"] = method.split(".")[-1] diff --git a/lib/execution_engine2/utils/SlackUtils.py b/lib/execution_engine2/utils/SlackUtils.py index 5a8c13fa8..ba9f6a815 100644 --- a/lib/execution_engine2/utils/SlackUtils.py +++ b/lib/execution_engine2/utils/SlackUtils.py @@ -41,7 +41,10 @@ def ee2_reaper_success( ): if not calculated_hold_reason: calculated_hold_reason = "Unknown" - message = f"Job {job_id} {batch_name} was successfully marked as error (status == {status}). It probably died because of {calculated_hold_reason} ({hold_reason} {hold_reason_code}" + message = ( + f"Job {job_id} {batch_name} was successfully marked as error (status == {status})." + + f" It probably died because of {calculated_hold_reason} ({hold_reason} {hold_reason_code}" + ) self.safe_chat_post_message(channel=self.channel, text=message) def run_job_message(self, job_id, scheduler_id, username): diff --git a/lib/execution_engine2/utils/slack_utils.py b/lib/execution_engine2/utils/slack_utils.py index 60455b2ee..e8c78bfa3 100644 --- a/lib/execution_engine2/utils/slack_utils.py +++ b/lib/execution_engine2/utils/slack_utils.py @@ -26,10 +26,11 @@ def send_slack_message(message): def _send_slack_message_chunks(message): - window = 15000 - for m in [message[i : i + window] for i in range(0, len(message), window)]: + for m in [ + message[i : i + window] for i in range(0, len(message), window) # noqa: E203 + ]: # noqa: E203 time.sleep(1) webhook_url = os.environ.get("SLACK_WEBHOOK_URL") slack_data = {"text": m} diff --git a/requirements-dev.txt b/requirements-dev.txt index a6676536b..6d7c06a25 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -15,7 +15,7 @@ cachetools==3.1.1 certifi==2019.6.16 cffi==1.14.0 chardet==3.0.4 -codecov==2.0.15 +codecov==2.0.16 configparser==3.7.4 confluent-kafka==1.5.0 coverage==4.5.3 @@ -44,7 +44,7 @@ mock==3.0.5 mongoengine==0.23.0 multidict==4.5.2 nose==1.3.7 -packaging==21.3; python_version >= '3.6' +packaging==23.0; python_version >= '3.7' pluggy==0.13.1 psutil==5.6.6 py==1.10.0 @@ -66,7 +66,7 @@ rfc3986==1.3.2 ruamel.yaml==0.15.87 sanic==19.6.0 sentry-sdk==0.14.3 -setuptools==62.3.2; python_version >= '3.7' +setuptools==67.2.0; python_version >= '3.7' six==1.14.0 slackclient==2.7.1 toml==0.10.1 diff --git a/requirements.txt b/requirements.txt index 618dc4908..be905f7b1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,15 +2,15 @@ aiofiles==0.7.0 aiohttp==3.7.4.post0 cachetools==4.2.2 -codecov==2.1.11 +codecov==2.1.13 configparser==5.0.2 -confluent-kafka==1.7.0 +confluent-kafka==1.9.2 coverage==5.5 docker==5.0.0 -gevent==21.1.2 +gevent==21.12.0 greenlet==1.1.0 gunicorn==20.1.0 -htcondor==9.1.0 +htcondor==9.12.0 Jinja2==3.0.1 JSONRPCBase==0.2.0 mock==4.0.3 @@ -24,7 +24,7 @@ python-dateutil==2.8.2 python-dotenv==0.18.0 requests==2.25.1 requests-mock==1.9.3 -sanic==21.6.0 +sanic==22.9.0 slackclient==2.9.3 toml==0.10.2 urllib3==1.26.6 diff --git a/scripts/entrypoint.sh b/scripts/entrypoint.sh index 2d5fbbab2..fce5ddc47 100755 --- a/scripts/entrypoint.sh +++ b/scripts/entrypoint.sh @@ -8,6 +8,14 @@ if [ $# -eq 0 ]; then /usr/sbin/condor_store_cred -p "${POOL_PASSWORD}" -f /etc/condor/password chown kbase:kbase /etc/condor/password fi + + #Add Condor Pool Token + if [ "$CONDOR_JWT_TOKEN" ] ; then + mkdir -p /home/kbase/.condor/tokens.d + echo "$CONDOR_JWT_TOKEN" > /home/kbase/.condor/tokens.d/JWT + chown kbase /home/kbase/.condor/tokens.d/JWT + chmod 600 /home/kbase/.condor/tokens.d/JWT + fi chown kbase /etc/condor/password # Copy downloaded JobRunner to a shared volume mount diff --git a/test/manual_tests/defunct_tests/ee2_catalog_test.py b/test/manual_tests/defunct_tests/ee2_catalog_test.py index 495e0ab83..8374efd85 100644 --- a/test/manual_tests/defunct_tests/ee2_catalog_test.py +++ b/test/manual_tests/defunct_tests/ee2_catalog_test.py @@ -1,4 +1,5 @@ import copy +import os import unittest from configparser import ConfigParser @@ -7,7 +8,7 @@ from test.utils_shared.test_utils import bootstrap bootstrap() -import os + print("Current in ", os.getcwd()) diff --git a/test/manual_tests/defunct_tests/ee2_scheduler_integration_test.py b/test/manual_tests/defunct_tests/ee2_scheduler_integration_test.py index e4774b694..a1d0a26ff 100644 --- a/test/manual_tests/defunct_tests/ee2_scheduler_integration_test.py +++ b/test/manual_tests/defunct_tests/ee2_scheduler_integration_test.py @@ -60,7 +60,7 @@ # "initial_dir": "/condor_shared", # "executable": "/bin/sleep", # "arguments": "5d38cbc5d9b2d9ce67fdbbc4 https://ci.kbase.us/services", -# "environment": "DOCKER_JOB_TIMEOUT=604805 KB_ADMIN_AUTH_TOKEN=test_auth_token KB_AUTH_TOKEN=XXXX CLIENTGROUP=None JOB_ID=5d38cbc5d9b2d9ce67fdbbc4 CONDOR_ID=$(Cluster).$(Process) ", +# "environment": "DOCKER_JOB_TIMEOUT=604805 KB_ADMIN_AUTH_TOKEN=test_auth_token KB_AUTH_TOKEN=XXXX CLIENTGROUP=None JOB_ID=5d38cbc5d9b2d9ce67fdbbc4 CONDOR_ID=$(Cluster).$(Process) ", # noqa: E501 # "universe": "vanilla", # "+AccountingGroup": "bsadkhin", # "Concurrency_Limits": "bsadkhin", diff --git a/test/manual_tests/ee2_scheduler_online_test.py b/test/manual_tests/ee2_scheduler_online_test.py index 97f41ce81..e9e7fd769 100644 --- a/test/manual_tests/ee2_scheduler_online_test.py +++ b/test/manual_tests/ee2_scheduler_online_test.py @@ -1,18 +1,18 @@ # -*- coding: utf-8 -*- import logging -import unittest - -logging.basicConfig(level=logging.INFO) - -from lib.installed_clients.execution_engine2Client import execution_engine2 -from lib.installed_clients.WorkspaceClient import Workspace import os import sys import time +import unittest +from pprint import pprint from dotenv import load_dotenv -from pprint import pprint + +from lib.installed_clients.WorkspaceClient import Workspace +from lib.installed_clients.execution_engine2Client import execution_engine2 + +logging.basicConfig(level=logging.INFO) load_dotenv("env/test.env", verbose=True) diff --git a/test/tests_for_db/ee2_MongoUtil_test.py b/test/tests_for_db/ee2_MongoUtil_test.py index 4cd973e60..badeb5b3a 100644 --- a/test/tests_for_db/ee2_MongoUtil_test.py +++ b/test/tests_for_db/ee2_MongoUtil_test.py @@ -1,13 +1,12 @@ # -*- coding: utf-8 -*- import logging import os -import time import unittest -from pytest import raises from bson.objectid import ObjectId +from pytest import raises -from execution_engine2.db.MongoUtil import MongoUtil, JobIdPair +from execution_engine2.db.MongoUtil import MongoUtil from execution_engine2.db.models.models import Job, JobLog, Status from test.utils_shared.test_utils import ( bootstrap, diff --git a/test/tests_for_integration/api_to_db_test.py b/test/tests_for_integration/api_to_db_test.py index c2943d1d0..37da9c865 100644 --- a/test/tests_for_integration/api_to_db_test.py +++ b/test/tests_for_integration/api_to_db_test.py @@ -39,7 +39,6 @@ from bson import ObjectId from pytest import fixture, raises -from execution_engine2.exceptions import InvalidParameterForBatch from execution_engine2.sdk.EE2Constants import ADMIN_READ_ROLE, ADMIN_WRITE_ROLE from installed_clients.WorkspaceClient import Workspace from installed_clients.baseclient import ServerError @@ -382,7 +381,7 @@ def test_get_admin_permission_success(ee2_port): assert ee2cli_write.get_admin_permission() == {"permission": "w"} -######## run_job tests ######## +# run_job tests ######## def _get_htc_mocks(): @@ -820,7 +819,7 @@ def _run_job_fail(ee2_port, token, params, expected, throw_exception=False): assert_exception_correct(got.value, ServerError("name", 1, expected)) -######## run_job_concierge tests ######## +# run_job_concierge tests ######## def test_run_job_concierge_minimal(ee2_port, ws_controller, mongo_client): @@ -1140,7 +1139,7 @@ def _run_job_concierge_fail( assert_exception_correct(got.value, ServerError("name", 1, expected)) -######## run_job_batch tests ######## +# run_job_batch tests ######## def test_run_job_batch(ee2_port, ws_controller, mongo_client): diff --git a/test/tests_for_sdkmr/EE2Status_test.py b/test/tests_for_sdkmr/EE2Status_test.py index 26596dc6f..528105fbc 100644 --- a/test/tests_for_sdkmr/EE2Status_test.py +++ b/test/tests_for_sdkmr/EE2Status_test.py @@ -4,23 +4,25 @@ from logging import Logger from unittest.mock import create_autospec, call + from bson.objectid import ObjectId +from execution_engine2.db.MongoUtil import MongoUtil from execution_engine2.db.models.models import Job, Status, JobInput -from execution_engine2.sdk.SDKMethodRunner import SDKMethodRunner from execution_engine2.sdk.EE2Status import JobsStatus, JobPermissions -from execution_engine2.db.MongoUtil import MongoUtil -from lib.execution_engine2.utils.KafkaUtils import KafkaClient, KafkaFinishJob -from lib.execution_engine2.utils.Condor import Condor +from execution_engine2.sdk.SDKMethodRunner import SDKMethodRunner from installed_clients.CatalogClient import Catalog +from lib.execution_engine2.utils.Condor import Condor +from lib.execution_engine2.utils.KafkaUtils import KafkaClient, KafkaFinishJob def _finish_job_complete_minimal_get_test_job(job_id, sched, app_id, gitcommit, user): job = Job() job.id = ObjectId(job_id) - job.running = 123.0 - job.finished = 456.5 + + job.finished = job.id.generation_time.timestamp() + 10 job.status = Status.running.value + job.running = job.id.generation_time.timestamp() + 5 job.scheduler_id = sched job_input = JobInput() job.job_input = job_input @@ -116,13 +118,41 @@ def _finish_job_complete_minimal(app_id, app_module): "func_module_name": "module", "func_name": "method_id", "git_commit_hash": gitcommit, - "creation_time": 1615246649.0, # from Job ObjectId - "exec_start_time": 123.0, - "finish_time": 456.5, + "creation_time": ObjectId( + job_id + ).generation_time.timestamp(), # from Job ObjectId + "exec_start_time": ObjectId(job_id).generation_time.timestamp() + 5, + "finish_time": ObjectId(job_id).generation_time.timestamp() + 10, "is_error": 0, "job_id": job_id, } if app_id: + app_id = app_id.split("/")[-1] les_expected.update({"app_id": app_id, "app_module_name": app_module}) catalog.log_exec_stats.assert_called_once_with(les_expected) mongo.update_job_resources.assert_called_once_with(job_id, resources) + + # Ensure that catalog stats were not logged for a job that was created but failed before running + bad_running_timestamps = [-1, 0, None] + for timestamp in bad_running_timestamps: + log_exec_stats_call_count = catalog.log_exec_stats.call_count + update_finished_job_with_usage_call_count = ( + mongo.update_job_resources.call_count + ) + job_id2 = "6046b539ce9c58ecf8c3e5f4" + subject_job = _finish_job_complete_minimal_get_test_job( + job_id2, + sched, + app_id, + gitcommit, + user, + ) + subject_job.running = timestamp + subject_job.status = Status.created.value + sdkmr.get_job_with_permission.side_effect = [subject_job, subject_job] + JobsStatus(sdkmr).finish_job(subject_job, job_output=job_output) # no return + assert catalog.log_exec_stats.call_count == log_exec_stats_call_count + assert ( + mongo.update_job_resources.call_count + == update_finished_job_with_usage_call_count + ) diff --git a/test/tests_for_sdkmr/ee2_SDKMethodRunner_EE2Logs_test.py b/test/tests_for_sdkmr/ee2_SDKMethodRunner_EE2Logs_test.py index 279cf0438..45ebb4604 100644 --- a/test/tests_for_sdkmr/ee2_SDKMethodRunner_EE2Logs_test.py +++ b/test/tests_for_sdkmr/ee2_SDKMethodRunner_EE2Logs_test.py @@ -3,7 +3,6 @@ import logging import os import unittest -from configparser import ConfigParser import requests_mock diff --git a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test.py b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test.py index ce98c0fb9..88b8a5ae1 100644 --- a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test.py +++ b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test.py @@ -22,6 +22,7 @@ from execution_engine2.db.models.models import Job, Status, TerminatedCode from execution_engine2.exceptions import AuthError from execution_engine2.exceptions import InvalidStatusTransitionException +from execution_engine2.sdk.EE2Runjob import EE2RunJob from execution_engine2.sdk.SDKMethodRunner import SDKMethodRunner from execution_engine2.sdk.job_submission_parameters import JobRequirements from execution_engine2.utils.Condor import Condor @@ -34,6 +35,8 @@ JobRequirementsResolver, RequirementsType, ) +from installed_clients.CatalogClient import Catalog +from installed_clients.WorkspaceClient import Workspace from test.tests_for_sdkmr.ee2_SDKMethodRunner_test_utils import ee2_sdkmr_test_helper from test.utils_shared.mock_utils import get_client_mocks, ALL_CLIENTS from test.utils_shared.test_utils import ( @@ -48,17 +51,17 @@ logging.basicConfig(level=logging.INFO) bootstrap() -from execution_engine2.sdk.EE2Runjob import EE2RunJob - -from installed_clients.CatalogClient import Catalog -from installed_clients.WorkspaceClient import Workspace - # TODO this isn't necessary with pytest, can just use regular old functions +# TODO Fix Cross Mock Pollution. Until then, run each test one by one to make sure it really passes. +# TODO Fix Cross Mock Pollution with the "Copy of the runner" likely being the culprit class ee2_SDKMethodRunner_test(unittest.TestCase): @classmethod def setUpClass(cls): - cls.config_file = os.environ.get("KB_DEPLOYMENT_CONFIG", "test/deploy.cfg") + cls.config_file = os.environ.get( + "KB_DEPLOYMENT_CONFIG", + "test/deploy.cfg", + ) logging.info(f"Loading config from {cls.config_file}") config_parser = ConfigParser() @@ -596,7 +599,8 @@ def test_finish_job(self, condor): runner = self.getRunner() runner._test_job_permissions = MagicMock(return_value=True) - runner.get_catalog().log_exec_stats = MagicMock(return_value=True) + mocked_catalog = runner.get_catalog() + mocked_catalog.log_exec_stats = MagicMock(return_value=True) # test missing job_id input with self.assertRaises(ValueError) as context1: @@ -651,6 +655,20 @@ def test_finish_job(self, condor): job_id=job_id, status=Status.running.value ) + expected_calls = { + "user_id": "wsadmin", + "app_module_name": "MEGAHIT", + "app_id": "run_megahit", + "func_module_name": "MEGAHIT", + "is_error": 0, + } + + for key in expected_calls: + assert ( + mocked_catalog.log_exec_stats.call_args[0][0][key] + == expected_calls[key] + ) + @patch("lib.execution_engine2.utils.Condor.Condor", autospec=True) def test_finish_job_with_error_message(self, condor): @@ -664,13 +682,9 @@ def test_finish_job_with_error_message(self, condor): condor._get_job_info = MagicMock(return_value={}) condor.get_job_resource_info = MagicMock(return_value={}) runner.condor = condor - runner._send_exec_stats_to_catalog = MagicMock(return_value=True) - runner.catalog_utils = MagicMock(return_value=True) + runner.catalog = MagicMock(return_value=True) runner._test_job_permissions = MagicMock(return_value=True) - # with self.assertRaises(InvalidStatusTransitionException): - # runner.finish_job(job_id=job_id, error_message="error message") - runner.start_job(job_id=job_id, skip_estimation=True) time.sleep(2) job = self.mongo_util.get_job(job_id=job_id) @@ -710,6 +724,20 @@ def test_finish_job_with_error_message(self, condor): self.mongo_util.get_job(job_id=job_id).delete() self.assertEqual(ori_job_count, Job.objects.count()) + expected_calls = { + "user_id": "wsadmin", + "app_module_name": "MEGAHIT", + "app_id": "run_megahit", + "func_module_name": "MEGAHIT", + "is_error": 1, + } + + for key in expected_calls: + assert ( + runner.catalog.log_exec_stats.call_args[0][0][key] + == expected_calls[key] + ) + @requests_mock.Mocker() def test_check_job_global_perm(self, rq_mock): rq_mock.add_matcher( @@ -930,17 +958,29 @@ def replace_job_id(self, job1, new_id): job1.delete() # flake8: noqa: C901 + @requests_mock.Mocker() @patch("lib.execution_engine2.utils.Condor.Condor", autospec=True) - def test_check_jobs_date_range(self, condor_mock): + def test_check_jobs_date_range(self, rq_mock, condor_mock): + rq_mock.add_matcher( + run_job_adapter( + ws_perms_info={"user_id": self.user_id, "ws_perms": {self.ws_id: "n"}}, + ws_perms_global=[self.ws_id], + user_roles=[], + ) + ) + # Mock Mock Mock user_name = "wsadmin" - runner = self.getRunner() + runner.check_is_admin = MagicMock(return_value=True) + runner.catalog_cache.lookup_git_commit_version = MagicMock( + return_value="commit_goes_here" + ) + runner.workspace_auth = MagicMock() # TODO redo this test with dependency injection & autospec vs. monkey patching resolver = create_autospec( JobRequirementsResolver, spec_set=True, instance=True ) - runner.workspace_auth = MagicMock() runner.get_job_requirements_resolver = MagicMock(return_value=resolver) resolver.get_requirements_type.return_value = RequirementsType.STANDARD resolver.resolve_requirements.return_value = JobRequirements( @@ -949,11 +989,8 @@ def test_check_jobs_date_range(self, condor_mock): disk_GB=1, client_group="njs", ) - runner.auth.get_user = MagicMock(return_value=user_name) - runner.check_is_admin = MagicMock(return_value=True) runner.workspace_auth.can_read = MagicMock(return_value=True) - self.mock = MagicMock(return_value=True) # fixed_rj = RunJob(runner) diff --git a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py index af441a81d..8b296c3f3 100644 --- a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py +++ b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py @@ -11,7 +11,6 @@ from execution_engine2.exceptions import ( CannotRetryJob, - RetryFailureException, InvalidParameterForBatch, ) from execution_engine2.sdk.job_submission_parameters import JobRequirements @@ -24,6 +23,7 @@ from lib.execution_engine2.db.models.models import Job, Status from lib.execution_engine2.sdk.SDKMethodRunner import SDKMethodRunner from lib.execution_engine2.utils.CondorTuples import SubmissionInfo +from test.tests_for_sdkmr.ee2_SDKMethodRunner_test_utils import ee2_sdkmr_test_helper from test.utils_shared.test_utils import ( bootstrap, get_example_job, @@ -35,8 +35,6 @@ logging.basicConfig(level=logging.INFO) bootstrap() -from test.tests_for_sdkmr.ee2_SDKMethodRunner_test_utils import ee2_sdkmr_test_helper - class ee2_SDKMethodRunner_test(unittest.TestCase): @classmethod diff --git a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Status_test.py b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Status_test.py index f356d8ce0..2fa9a972e 100644 --- a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Status_test.py +++ b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Status_test.py @@ -10,21 +10,20 @@ from mock import MagicMock from mongoengine import ValidationError +from execution_engine2.utils.clients import get_user_client_set, get_client_set from lib.execution_engine2.db.models.models import Job +from lib.execution_engine2.db.models.models import Status from lib.execution_engine2.sdk.SDKMethodRunner import SDKMethodRunner from lib.execution_engine2.utils.CondorTuples import SubmissionInfo -from execution_engine2.utils.clients import get_user_client_set, get_client_set from test.tests_for_sdkmr.ee2_SDKMethodRunner_test_utils import ee2_sdkmr_test_helper -from test.utils_shared.test_utils import bootstrap, get_example_job - -logging.basicConfig(level=logging.INFO) -bootstrap() - +from test.utils_shared.test_utils import bootstrap from test.utils_shared.test_utils import ( get_example_job_as_dict_for_runjob, run_job_adapter, ) -from lib.execution_engine2.db.models.models import Status + +logging.basicConfig(level=logging.INFO) +bootstrap() class ee2_SDKMethodRunner_test_status(unittest.TestCase): diff --git a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_utils.py b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_utils.py index 9ec251f22..c81e282ad 100644 --- a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_utils.py +++ b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_utils.py @@ -1,7 +1,6 @@ # -*- coding: utf-8 -*- from lib.execution_engine2.db.models.models import Job, JobInput, Meta -from lib.execution_engine2.sdk.SDKMethodRunner import SDKMethodRunner class ee2_sdkmr_test_helper: diff --git a/test/tests_for_sdkmr/ee2_load_test.py b/test/tests_for_sdkmr/ee2_load_test.py index 55b614e6f..829bfd45e 100644 --- a/test/tests_for_sdkmr/ee2_load_test.py +++ b/test/tests_for_sdkmr/ee2_load_test.py @@ -1,19 +1,19 @@ # -*- coding: utf-8 -*- -import copy import logging import os import queue import threading import time import unittest -from configparser import ConfigParser from unittest.mock import patch +from mock import MagicMock + from execution_engine2.authorization.workspaceauth import WorkspaceAuth from execution_engine2.db.MongoUtil import MongoUtil from execution_engine2.db.models.models import Job, Status from execution_engine2.execution_engine2Impl import execution_engine2 -from execution_engine2.sdk.EE2Status import JobsStatus +from execution_engine2.sdk.EE2Status import JobsStatus # noqa: F401 from execution_engine2.sdk.SDKMethodRunner import SDKMethodRunner from execution_engine2.sdk.job_submission_parameters import JobRequirements from execution_engine2.utils.Condor import Condor @@ -28,7 +28,6 @@ logging.basicConfig(level=logging.INFO) bootstrap() -from mock import MagicMock class ee2_server_load_test(unittest.TestCase): diff --git a/tox.ini b/tox.ini index b9ccce43f..d64618164 100644 --- a/tox.ini +++ b/tox.ini @@ -1,11 +1,18 @@ [flake8] -max-line-length = 100 +max-line-length = 150 exclude = lib/biokbase, submodules, */prepare_deploy_cfg.py, - */NarrativeRunner_server_test.py, - test_scripts + test_scripts, + *deploy.cfg, + *deploy_docker_mongo.cfg, + *ini, + *.md + *lib/installed_clients*, + */execution_engine2Impl.py*, + *execution_engine2Server.py, + *lib/execution_engine2/authclient.py* putty-ignore = */__init__.py : F401,E126 *Impl.py : E265,E266