Skip to content

Commit

Permalink
Merge pull request #235 from kbase/add_latest_clients
Browse files Browse the repository at this point in the history
Add latest clients + Update Service Ver
  • Loading branch information
Tianhao-Gu authored May 2, 2020
2 parents 970409d + 686f790 commit a431cff
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 16 deletions.
11 changes: 8 additions & 3 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# execution_engine2 release notes
=========================================
## 0.0.2

0.0.0
-----
* Module created by kb-sdk init
* Fixed bug with service version displaying release instead of git commit
* Updated clients
* Update transfer script to handle failures

## 0.0.0

* Module created by kb-sdk init
11 changes: 5 additions & 6 deletions lib/execution_engine2/db/models/transfer/transfer_ujs_njs.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,10 @@ def get_njs_job_input(self, njs_job):
return job_input

def save_job(self, job):
self.jobs.append(job.to_mongo())
if len(self.jobs) > self.threshold:
print("INSERTING 1000 ELEMENTS")
self.ee2_jobs.insert_many(self.jobs)
self.jobs = []
try:
self.ee2_jobs.insert_one(document=job.to_mongo())
except Exception as e:
print(e)

def save_remnants(self):
self.ee2_jobs.insert_many(self.jobs)
Expand Down Expand Up @@ -314,7 +313,7 @@ def begin_job_transfer(self): # flake8: noqa

self.save_job(job)
# Save leftover jobs
self.save_remnants()
# self.save_remnants()

# TODO SAVE up to 5000 in memory and do a bulk insert
# a = []
Expand Down
12 changes: 9 additions & 3 deletions lib/execution_engine2/sdk/EE2Runjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ def _init_job_rec(
inputs.wsid = job.wsid
inputs.method = params.get("method")
inputs.params = params.get("params")

params["service_ver"] = self._get_module_git_commit(
params.get("method"), params.get("service_ver")
)
inputs.service_ver = params.get("service_ver")

inputs.app_id = params.get("app_id")
inputs.source_ws_objects = params.get("source_ws_objects")
inputs.parent_job_id = str(params.get("parent_job_id"))
Expand Down Expand Up @@ -96,6 +101,7 @@ def _init_job_rec(
self.sdkmr.logger.debug(job.job_input.to_mongo().to_dict())

with self.sdkmr.get_mongo_util().mongo_engine_connection():
self.sdkmr.logger.debug(job.to_mongo().to_dict())
job.save()

self.sdkmr.kafka_client.send_kafka_message(
Expand All @@ -110,6 +116,7 @@ def _get_module_git_commit(self, method, service_ver=None) -> Optional[str]:
if not service_ver:
service_ver = "release"

self.sdkmr.logger.debug(f"Getting commit for {module_name} {service_ver}")
module_version = self.sdkmr.catalog_utils.catalog.get_module_version(
{"module_name": module_name, "version": service_ver}
)
Expand Down Expand Up @@ -177,12 +184,11 @@ def _prepare_to_run(self, params, concierge_params) -> PreparedJobParams:
cgrr=normalized_resources
) # type: CondorResources
# insert initial job document into db

job_id = self._init_job_rec(
self.sdkmr.user_id, params, extracted_resources, concierge_params
)
params["service_ver"] = self._get_module_git_commit(
method, params.get("service_ver")
)

params["job_id"] = job_id
params["user_id"] = self.sdkmr.user_id
params["token"] = self.sdkmr.token
Expand Down
2 changes: 1 addition & 1 deletion lib/installed_clients/execution_engine2Client.py
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ def check_jobs_date_range_for_all(self, params, context=None):
def handle_held_job(self, cluster_id, context=None):
"""
Handle a held CONDOR job. You probably never want to run this, only the reaper should run it.
:param cluster_id: instance of Double
:param cluster_id: instance of String
:returns: instance of type "HeldJob" -> structure: parameter
"held_job" of unspecified object
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,12 @@ def test_init_job_rec(self):
self.assertEqual(job_input.wsid, self.ws_id)
self.assertEqual(job_input.method, "MEGAHIT.run_megahit")
self.assertEqual(job_input.app_id, "MEGAHIT/run_megahit")
self.assertEqual(job_input.service_ver, "2.2.1")
# TODO this is an integration test
# self.assertEqual(job_input.service_ver, "2.2.1")
self.assertEqual(
job_input.service_ver, "048baf3c2b76cb923b3b4c52008ed77dbe20292d"
)

self.assertCountEqual(job_input.source_ws_objects, ["a/b/c", "e/d"])
self.assertEqual(job_input.parent_job_id, "9998")

Expand Down
7 changes: 5 additions & 2 deletions test/tests_for_sdkmr/ee2_load_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,12 @@ def getRunner(self) -> SDKMethodRunner:
# Initialize these clients from None
runner = copy.deepcopy(self.__class__.method_runner) # type : SDKMethodRunner
runner._ee2_status = runner.get_jobs_status() # type: JobsStatus
runner._ee2_status._send_exec_stats_to_catalog = MagicMock(return_val=True)
runner._ee2_status.update_finished_job_with_usage = MagicMock(return_val=True)
runner._ee2_status._send_exec_stats_to_catalog = MagicMock(return_value=True)
runner._ee2_status.update_finished_job_with_usage = MagicMock(return_value=True)
runner.get_runjob()
runner._ee2_runjob._get_module_git_commit = MagicMock(
return_value="GitCommithash"
)
runner.get_job_logs()
runner.get_condor()
runner.condor = MagicMock(autospec=True)
Expand Down

0 comments on commit a431cff

Please sign in to comment.