From 784009e378d360765fffa9ceac3854ee6e15b342 Mon Sep 17 00:00:00 2001 From: n1mus <709030+n1mus@users.noreply.github.com> Date: Mon, 9 May 2022 13:29:16 -0700 Subject: [PATCH 01/10] job ts implementation --- docs/testing/HeadlessTesting.md | 4 +- src/biokbase/narrative/jobs/job.py | 75 ++++++++++++------- src/biokbase/narrative/jobs/jobcomm.py | 6 ++ src/biokbase/narrative/jobs/jobmanager.py | 8 +- src/biokbase/narrative/tests/test_job.py | 18 ++--- .../narrative/tests/test_jobmanager.py | 2 +- 6 files changed, 75 insertions(+), 38 deletions(-) diff --git a/docs/testing/HeadlessTesting.md b/docs/testing/HeadlessTesting.md index 08b862ff0f..0e4e9f12d3 100644 --- a/docs/testing/HeadlessTesting.md +++ b/docs/testing/HeadlessTesting.md @@ -161,9 +161,9 @@ job = AppManager().run_app( import time import pprint import json -while job.state()['job_state'] not in ['completed', 'suspend']: +while job.refresh_state()['job_state'] not in ['completed', 'suspend']: time.sleep(5) -job_result = job.state() +job_result = job.refresh_state() if job_result['job_state'] != 'completed': print "Failed - job did not complete: " + ",".join(job_result['status'][0:3]) else: diff --git a/src/biokbase/narrative/jobs/job.py b/src/biokbase/narrative/jobs/job.py index a33bde4680..f41bc7e83c 100644 --- a/src/biokbase/narrative/jobs/job.py +++ b/src/biokbase/narrative/jobs/job.py @@ -82,6 +82,33 @@ STATE_ATTRS = list(set(JOB_ATTRS) - set(JOB_INPUT_ATTRS) - set(NARR_CELL_INFO_ATTRS)) +def merge(d0: dict, d1: dict): + d0 = copy.deepcopy(d0) + merge_inplace(d0, d1) + return d0 + + +def merge_inplace(d0: dict, d1: dict): + """ + Recursively merge nested dicts d1 into d0, + overwriting any values in d0 that are not nested dicts. + Mutates d0 + """ + for k, v1 in d1.items(): + if k in d0: + v0 = d0[k] + is_dict_0 = isinstance(v0, dict) + is_dict_1 = isinstance(v1, dict) + if is_dict_0 ^ is_dict_1: + raise ValueError(f"For key {k}: is_dict(v0) xor is_dict(v1)") + elif not is_dict_0 and not is_dict_1: + d0[k] = v1 + elif is_dict_0 and is_dict_1: + merge_inplace(v0, v1) + else: + d0[k] = v0 + + class Job: _job_logs = [] _acc_state = None # accumulates state @@ -199,7 +226,7 @@ def __getattr__(self, name): # and need the state refresh. # But KBParallel/KB Batch App jobs may not have the # batch_job field - self.state(force_refresh=True).get( + self.refresh_state(force_refresh=True).get( "child_jobs", JOB_ATTR_DEFAULTS["child_jobs"] ) if self.batch_job @@ -216,7 +243,7 @@ def __getattr__(self, name): # retry_ids field so skip the state refresh self._acc_state.get("retry_ids", JOB_ATTR_DEFAULTS["retry_ids"]) if self.batch_job or self.retry_parent - else self.state(force_refresh=True).get( + else self.refresh_state(force_refresh=True).get( "retry_ids", JOB_ATTR_DEFAULTS["retry_ids"] ) ), @@ -240,16 +267,11 @@ def __getattr__(self, name): def __setattr__(self, name, value): if name in STATE_ATTRS: - self._acc_state[name] = value + self._update_state({name: value}) elif name in JOB_INPUT_ATTRS: - self._acc_state["job_input"] = self._acc_state.get("job_input", {}) - self._acc_state["job_input"][name] = value + self._update_state({"job_input": {name: value}}) elif name in NARR_CELL_INFO_ATTRS: - self._acc_state["job_input"] = self._acc_state.get("job_input", {}) - self._acc_state["job_input"]["narrative_cell_info"] = self._acc_state[ - "job_input" - ].get("narrative_cell_info", {}) - self._acc_state["job_input"]["narrative_cell_info"][name] = value + self._update_state({"job_input": {"narrative_cell_info": {name: value}}}) else: object.__setattr__(self, name, value) @@ -274,11 +296,11 @@ def was_terminal(self): return self._acc_state.get("status") in TERMINAL_STATUSES def is_terminal(self): - self.state() + self.refresh_state() if self._acc_state.get("batch_job"): for child_job in self.children: if child_job._acc_state.get("status") != COMPLETED_STATUS: - child_job.state(force_refresh=True) + child_job.refresh_state(force_refresh=True) return self.was_terminal() def in_cells(self, cell_ids: List[str]) -> bool: @@ -326,7 +348,8 @@ def parameters(self): def _update_state(self, state: dict) -> None: """ - given a state data structure (as emitted by ee2), update the stored state in the job object + Given a state data structure (as emitted by ee2), update the stored state in the job object + All updates to the job state should go through here to keep the last_updated field accurate """ if not isinstance(state, dict): raise TypeError("state must be a dict") @@ -341,16 +364,18 @@ def _update_state(self, state: dict) -> None: # Check if there would be no change in updating # i.e., if state <= self._acc_state if self._acc_state is not None: - if {**self._acc_state, **state} == self._acc_state: + if merge(self._acc_state, state) == self._acc_state: return state = copy.deepcopy(state) if self._acc_state is None: self._acc_state = state else: - self._acc_state.update(state) + merge_inplace(self._acc_state, state) + + self.last_updated = time.time_ns() - def state(self, force_refresh=False, exclude=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS): + def refresh_state(self, force_refresh=False, exclude=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS): """ Queries the job service to see the state of the current job. """ @@ -359,9 +384,9 @@ def state(self, force_refresh=False, exclude=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS) state = self.query_ee2_state(self.job_id, init=False) self._update_state(state) - return self._internal_state(exclude) + return self.static_state(exclude) - def _internal_state(self, exclude=None): + def static_state(self, exclude=None): """Wrapper for self._acc_state""" state = copy.deepcopy(self._acc_state) self._trim_ee2_state(state, exclude) @@ -371,7 +396,7 @@ def output_state(self, state=None, no_refresh=False) -> dict: """ :param state: Supplied when the state is queried beforehand from EE2 in bulk, or when it is retrieved from a cache. If not supplied, must be - queried with self.state() or self._internal_state() + queried with self.refresh_state() or self.static_state() :return: dict, with structure { @@ -424,10 +449,10 @@ def output_state(self, state=None, no_refresh=False) -> dict: :rtype: dict """ if not state: - state = self._internal_state() if no_refresh else self.state() + state = self.static_state() if no_refresh else self.refresh_state() else: self._update_state(state) - state = self._internal_state() + state = self.static_state() if state is None: return self._create_error_state( @@ -475,10 +500,10 @@ def show_output_widget(self, state=None): from biokbase.narrative.widgetmanager import WidgetManager if not state: - state = self.state() + state = self.refresh_state() else: self._update_state(state) - state = self._internal_state() + state = self.static_state() if state["status"] == COMPLETED_STATUS and "job_output" in state: (output_widget, widget_params) = self._get_output_info(state) @@ -611,7 +636,7 @@ def info(self): print(f"Version: {spec['info']['ver']}") try: - state = self.state() + state = self.refresh_state() print(f"Status: {state['status']}") print("Inputs:\n------") pprint(self.params) @@ -631,7 +656,7 @@ def _repr_javascript_(self): """ output_widget_info = None try: - state = self.state() + state = self.refresh_state() spec = self.app_spec() if state.get("status", "") == COMPLETED_STATUS: (output_widget, widget_params) = self._get_output_info(state) diff --git a/src/biokbase/narrative/jobs/jobcomm.py b/src/biokbase/narrative/jobs/jobcomm.py index 9fdfc76627..ba58bc8ffb 100644 --- a/src/biokbase/narrative/jobs/jobcomm.py +++ b/src/biokbase/narrative/jobs/jobcomm.py @@ -1,5 +1,6 @@ import copy import threading +import time from typing import List, Union from ipykernel.comm import Comm @@ -354,6 +355,11 @@ def _get_job_states(self, job_id_list: list, ts: int = None) -> dict: :rtype: dict """ output_states = self._jm.get_job_states(job_id_list, ts) + + now = time.time_ns() + for output_state in output_states.values(): + output_state["last_checked"] = now + self.send_comm_message(MESSAGE_TYPE["STATUS"], output_states) return output_states diff --git a/src/biokbase/narrative/jobs/jobmanager.py b/src/biokbase/narrative/jobs/jobmanager.py index 0c80738f11..14e66371b8 100644 --- a/src/biokbase/narrative/jobs/jobmanager.py +++ b/src/biokbase/narrative/jobs/jobmanager.py @@ -322,7 +322,7 @@ def _construct_job_output_state_set( def get_job_states(self, job_ids: List[str], ts: int = None) -> dict: """ Retrieves the job states for the supplied job_ids, with the option to - replace any jobs that have not been updated since ts with a short stub + remove any jobs that have not been updated since ts Jobs that cannot be found in the `_running_jobs` index will return { @@ -340,6 +340,12 @@ def get_job_states(self, job_ids: List[str], ts: int = None) -> dict: """ job_ids, error_ids = self._check_job_list(job_ids) output_states = self._construct_job_output_state_set(job_ids) + + if ts is not None: + for job_id in job_ids: + if self.get_job(job_id).last_updated < ts: + del output_states[job_id] + return self.add_errors_to_results(output_states, error_ids) def get_all_job_states(self, ignore_refresh_flag=False) -> dict: diff --git a/src/biokbase/narrative/tests/test_job.py b/src/biokbase/narrative/tests/test_job.py index 7f8bf5dc3f..651324972c 100644 --- a/src/biokbase/narrative/tests/test_job.py +++ b/src/biokbase/narrative/tests/test_job.py @@ -74,7 +74,7 @@ def create_job_from_ee2(job_id, extra_data=None, children=None): def create_state_from_ee2(job_id, exclude_fields=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS): """ - create the output of job.state() from raw job data + create the output of job.refresh_state() from raw job data """ state = get_test_job(job_id) @@ -178,7 +178,7 @@ def check_jobs_equal(self, jobl, jobr): self.assertEqual(jobl._acc_state, jobr._acc_state) with mock.patch(CLIENTS, get_mock_client): - self.assertEqual(jobl.state(), jobr.state()) + self.assertEqual(jobl.refresh_state(), jobr.refresh_state()) for attr in JOB_ATTRS: self.assertEqual(getattr(jobl, attr), getattr(jobr, attr)) @@ -201,7 +201,7 @@ def check_job_attrs(self, job, job_id, exp_attrs=None, skip_state=False): if not exp_attrs and not skip_state: state = create_state_from_ee2(job_id) with mock.patch(CLIENTS, get_mock_client): - self.assertEqual(state, job.state()) + self.assertEqual(state, job.refresh_state()) attrs = create_attrs_from_ee2(job_id) attrs.update(exp_attrs) @@ -322,7 +322,7 @@ def test_state__non_terminal(self): # ee2_state is fully populated (includes job_input, no job_output) job = create_job_from_ee2(JOB_CREATED) self.assertFalse(job.was_terminal()) - state = job.state() + state = job.refresh_state() self.assertFalse(job.was_terminal()) self.assertEqual(state["status"], "created") @@ -338,7 +338,7 @@ def test_state__terminal(self): expected = create_state_from_ee2(JOB_COMPLETED) with assert_obj_method_called(MockClients, "check_job", call_status=False): - state = job.state() + state = job.refresh_state() self.assertEqual(state["status"], "completed") self.assertEqual(state, expected) @@ -350,7 +350,7 @@ def test_state__raise_exception(self): job = create_job_from_ee2(JOB_CREATED) self.assertFalse(job.was_terminal()) with self.assertRaisesRegex(ServerError, "check_job failed"): - job.state() + job.refresh_state() def test_state__returns_none(self): def mock_state(self, state=None): @@ -400,7 +400,7 @@ def test_job_update__invalid_job_id(self): """ job = create_job_from_ee2(JOB_RUNNING) expected = create_state_from_ee2(JOB_RUNNING) - self.assertEqual(job.state(), expected) + self.assertEqual(job.refresh_state(), expected) # try to update it with the job state from a different job with self.assertRaisesRegex(ValueError, "Job ID mismatch in _update_state"): @@ -559,7 +559,7 @@ def test_parent_children__ok(self): mock.Mock(return_value={"status": COMPLETED_STATUS}), ): for child_job in child_jobs: - child_job.state(force_refresh=True) + child_job.refresh_state(force_refresh=True) self.assertTrue(parent_job.was_terminal()) @@ -726,7 +726,7 @@ def mock_check_job(self_, params): with mock.patch.object(MockClients, "check_job", mock_check_job): for job in child_jobs: - job.state(force_refresh=True) + job.refresh_state(force_refresh=True) self.assertTrue(batch_job.was_terminal()) diff --git a/src/biokbase/narrative/tests/test_jobmanager.py b/src/biokbase/narrative/tests/test_jobmanager.py index dc2c36bb05..82550c603a 100644 --- a/src/biokbase/narrative/tests/test_jobmanager.py +++ b/src/biokbase/narrative/tests/test_jobmanager.py @@ -736,7 +736,7 @@ def test_update_batch_job__change(self): new_child_ids = BATCH_CHILDREN[1:] + [JOB_CREATED, JOB_NOT_FOUND] def mock_check_job(params): - """Called from job.state()""" + """Called from job.refresh_state()""" job_id = params["job_id"] if job_id == BATCH_PARENT: return {"child_jobs": new_child_ids} From 1a03f4e08be3a3e1187121eeda657ff9241e0726 Mon Sep 17 00:00:00 2001 From: n1mus <709030+n1mus@users.noreply.github.com> Date: Mon, 9 May 2022 15:56:45 -0700 Subject: [PATCH 02/10] rename job.static_state, remove job.is_terminal --- src/biokbase/narrative/jobs/job.py | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/src/biokbase/narrative/jobs/job.py b/src/biokbase/narrative/jobs/job.py index f41bc7e83c..301f10c740 100644 --- a/src/biokbase/narrative/jobs/job.py +++ b/src/biokbase/narrative/jobs/job.py @@ -295,14 +295,6 @@ def was_terminal(self): else: return self._acc_state.get("status") in TERMINAL_STATUSES - def is_terminal(self): - self.refresh_state() - if self._acc_state.get("batch_job"): - for child_job in self.children: - if child_job._acc_state.get("status") != COMPLETED_STATUS: - child_job.refresh_state(force_refresh=True) - return self.was_terminal() - def in_cells(self, cell_ids: List[str]) -> bool: """ For job initialization. @@ -384,9 +376,9 @@ def refresh_state(self, force_refresh=False, exclude=JOB_INIT_EXCLUDED_JOB_STATE state = self.query_ee2_state(self.job_id, init=False) self._update_state(state) - return self.static_state(exclude) + return self.current_state(exclude) - def static_state(self, exclude=None): + def current_state(self, exclude=None): """Wrapper for self._acc_state""" state = copy.deepcopy(self._acc_state) self._trim_ee2_state(state, exclude) @@ -396,7 +388,7 @@ def output_state(self, state=None, no_refresh=False) -> dict: """ :param state: Supplied when the state is queried beforehand from EE2 in bulk, or when it is retrieved from a cache. If not supplied, must be - queried with self.refresh_state() or self.static_state() + queried with self.refresh_state() or self.current_state() :return: dict, with structure { @@ -449,10 +441,10 @@ def output_state(self, state=None, no_refresh=False) -> dict: :rtype: dict """ if not state: - state = self.static_state() if no_refresh else self.refresh_state() + state = self.current_state() if no_refresh else self.refresh_state() else: self._update_state(state) - state = self.static_state() + state = self.current_state() if state is None: return self._create_error_state( @@ -503,7 +495,7 @@ def show_output_widget(self, state=None): state = self.refresh_state() else: self._update_state(state) - state = self.static_state() + state = self.current_state() if state["status"] == COMPLETED_STATUS and "job_output" in state: (output_widget, widget_params) = self._get_output_info(state) From 02ae2de3de212056d3fff5bbc7dcbca8205b1c14 Mon Sep 17 00:00:00 2001 From: n1mus <709030+n1mus@users.noreply.github.com> Date: Tue, 10 May 2022 03:45:05 -0700 Subject: [PATCH 03/10] add job ts tests --- src/biokbase/narrative/jobs/job.py | 30 +-- src/biokbase/narrative/jobs/jobcomm.py | 13 +- src/biokbase/narrative/jobs/util.py | 28 +++ .../narrative/tests/job_test_constants.py | 2 +- src/biokbase/narrative/tests/test_job.py | 2 +- src/biokbase/narrative/tests/test_job_util.py | 186 +++++++++++++++++- src/biokbase/narrative/tests/test_jobcomm.py | 106 ++++++++++ .../narrative/tests/test_jobmanager.py | 60 ++++++ 8 files changed, 391 insertions(+), 36 deletions(-) diff --git a/src/biokbase/narrative/jobs/job.py b/src/biokbase/narrative/jobs/job.py index 301f10c740..66d54c2a55 100644 --- a/src/biokbase/narrative/jobs/job.py +++ b/src/biokbase/narrative/jobs/job.py @@ -9,6 +9,7 @@ import biokbase.narrative.clients as clients from biokbase.narrative.app_util import map_inputs_from_job, map_outputs_from_state from biokbase.narrative.exception_util import transform_job_exception +from biokbase.narrative.jobs.util import merge, merge_inplace from .specmanager import SpecManager @@ -82,33 +83,6 @@ STATE_ATTRS = list(set(JOB_ATTRS) - set(JOB_INPUT_ATTRS) - set(NARR_CELL_INFO_ATTRS)) -def merge(d0: dict, d1: dict): - d0 = copy.deepcopy(d0) - merge_inplace(d0, d1) - return d0 - - -def merge_inplace(d0: dict, d1: dict): - """ - Recursively merge nested dicts d1 into d0, - overwriting any values in d0 that are not nested dicts. - Mutates d0 - """ - for k, v1 in d1.items(): - if k in d0: - v0 = d0[k] - is_dict_0 = isinstance(v0, dict) - is_dict_1 = isinstance(v1, dict) - if is_dict_0 ^ is_dict_1: - raise ValueError(f"For key {k}: is_dict(v0) xor is_dict(v1)") - elif not is_dict_0 and not is_dict_1: - d0[k] = v1 - elif is_dict_0 and is_dict_1: - merge_inplace(v0, v1) - else: - d0[k] = v0 - - class Job: _job_logs = [] _acc_state = None # accumulates state @@ -558,7 +532,7 @@ def log(self, first_line=0, num_lines=None): return (num_available_lines, []) return ( num_available_lines, - self._job_logs[first_line : first_line + num_lines], + self._job_logs[first_line: first_line + num_lines], ) def _update_log(self): diff --git a/src/biokbase/narrative/jobs/jobcomm.py b/src/biokbase/narrative/jobs/jobcomm.py index ba58bc8ffb..4bdf35f0d3 100644 --- a/src/biokbase/narrative/jobs/jobcomm.py +++ b/src/biokbase/narrative/jobs/jobcomm.py @@ -355,11 +355,6 @@ def _get_job_states(self, job_id_list: list, ts: int = None) -> dict: :rtype: dict """ output_states = self._jm.get_job_states(job_id_list, ts) - - now = time.time_ns() - for output_state in output_states.values(): - output_state["last_checked"] = now - self.send_comm_message(MESSAGE_TYPE["STATUS"], output_states) return output_states @@ -520,6 +515,14 @@ def send_comm_message(self, msg_type: str, content: dict) -> None: Sends a ipykernel.Comm message to the KBaseJobs channel with the given msg_type and content. These just get encoded into the message itself. """ + # For STATUS responses, add a last_checked field + # to each output_state. Note: error states will have + # the last_checked field too + if msg_type == MESSAGE_TYPE["STATUS"]: + now = time.time_ns() + for output_state in content.values(): + output_state["last_checked"] = now + msg = {"msg_type": msg_type, "content": content} self._comm.send(msg) diff --git a/src/biokbase/narrative/jobs/util.py b/src/biokbase/narrative/jobs/util.py index 8b9c07e77a..27aa4b525a 100644 --- a/src/biokbase/narrative/jobs/util.py +++ b/src/biokbase/narrative/jobs/util.py @@ -1,3 +1,4 @@ +import copy import json import os @@ -58,3 +59,30 @@ def load_job_constants(relative_path_to_file=JOB_CONFIG_FILE_PATH_PARTS): ) return (config["params"], config["message_types"]) + + +def merge(d0: dict, d1: dict): + d0 = copy.deepcopy(d0) + merge_inplace(d0, d1) + return d0 + + +def merge_inplace(d0: dict, d1: dict): + """ + Recursively merge nested dicts d1 into d0, + overwriting any values in d0 that are not nested dicts. + Mutates d0 + """ + for k, v1 in d1.items(): + if k in d0: + v0 = d0[k] + is_dict_0 = isinstance(v0, dict) + is_dict_1 = isinstance(v1, dict) + if is_dict_0 ^ is_dict_1: + raise ValueError(f"For key {k}: is_dict(v0) xor is_dict(v1)") + elif not is_dict_0 and not is_dict_1: + d0[k] = v1 + elif is_dict_0 and is_dict_1: + merge_inplace(v0, v1) + else: + d0[k] = v1 diff --git a/src/biokbase/narrative/tests/job_test_constants.py b/src/biokbase/narrative/tests/job_test_constants.py index 597c43eec7..112c27bcbc 100644 --- a/src/biokbase/narrative/tests/job_test_constants.py +++ b/src/biokbase/narrative/tests/job_test_constants.py @@ -41,7 +41,7 @@ def get_test_jobs(job_ids): CLIENTS = "biokbase.narrative.clients.get" -TIME_NS = "biokbase.narrative.jobs.jobcomm.time.time_ns" +JC_TIME_NS = "biokbase.narrative.jobs.jobcomm.time.time_ns" TEST_EPOCH_NS = 42 # arbitrary epoch ns MAX_LOG_LINES = 10 diff --git a/src/biokbase/narrative/tests/test_job.py b/src/biokbase/narrative/tests/test_job.py index 651324972c..e71f83090b 100644 --- a/src/biokbase/narrative/tests/test_job.py +++ b/src/biokbase/narrative/tests/test_job.py @@ -373,7 +373,7 @@ def mock_state(self, state=None): "created": 0, } - with mock.patch.object(Job, "state", mock_state): + with mock.patch.object(Job, "refresh_state", mock_state): state = job.output_state() self.assertEqual(expected, state) diff --git a/src/biokbase/narrative/tests/test_job_util.py b/src/biokbase/narrative/tests/test_job_util.py index 656d86f160..8ba414b704 100644 --- a/src/biokbase/narrative/tests/test_job_util.py +++ b/src/biokbase/narrative/tests/test_job_util.py @@ -1,6 +1,8 @@ +import copy +import re import unittest -from biokbase.narrative.jobs.util import load_job_constants +from biokbase.narrative.jobs.util import load_job_constants, merge, merge_inplace class JobUtilTestCase(unittest.TestCase): @@ -57,5 +59,187 @@ def test_load_job_constants__valid(self): self.assertIn(item, message_types) +class MergeTest(unittest.TestCase): + def _check_merge_inplace(self, d0: dict, d1: dict, exp_merge: dict): + d1_copy = copy.deepcopy(d1) + merge_inplace(d0, d1) + self.assertEqual( + d0, + exp_merge + ) + self.assertEqual( + d1, + d1_copy + ) + + def test_merge_inplace__empty(self): + d0 = {} + d1 = {} + self._check_merge_inplace( + d0, + d1, + {} + ) + + def test_merge_inplace__d0_empty(self): + # flat + d0 = {} + d1 = {"level00": "l00"} + self._check_merge_inplace( + d0, + d1, + {"level00": "l00"} + ) + + # nested + d0 = {} + d1 = { + "level00": "l00", + "level01": { + "level10": "l10" + } + } + self._check_merge_inplace( + d0, + d1, + { + "level00": "l00", + "level01": { + "level10": "l10" + } + } + ) + + def test_merge_inplace__d1_empty(self): + # flat + d0 = {"level00": "l00"} + d1 = {} + self._check_merge_inplace( + d0, + d1, + {"level00": "l00"} + ) + + # nested + d0 = { + "level00": "l00", + "level01": { + "level10": "l10" + } + } + d1 = {} + self._check_merge_inplace( + d0, + d1, + { + "level00": "l00", + "level01": { + "level10": "l10" + } + } + ) + + def test_merge_inplace__flat(self): + d0 = { + "level00": "l00", + "level01": "l01" + } + d1 = { + "level01": "l01_", + "level02": "l02" + } + self._check_merge_inplace( + d0, + d1, + { + "level00": "l00", + "level01": "l01_", + "level02": "l02" + } + ) + + def test_merge_inplace__nested(self): + d0 = { + "level00": { + "level10": { + "level20": "l20", + "level21": "l21" + } + }, + "level01": "l01" + } + d1 = { + "level00": { + "level10": { + "level22": "l22" + } + }, + "level01": "l01_" + } + self._check_merge_inplace( + d0, + d1, + { + "level00": { + "level10": { + "level20": "l20", + "level21": "l21", + "level22": "l22" + } + }, + "level01": "l01_" + } + ) + + def test_merge_inplace__xor_dicts(self): + d0 = { + "level00": {} + } + d1 = { + "level00": "l00", + "level01": "l01" + } + with self.assertRaisesRegex( + ValueError, + re.escape("For key level00: is_dict(v0) xor is_dict(v1)") + ): + merge_inplace(d0, d1) + + def test_merge(self): + d0 = { + "level00": "l00", + "level01": { + "level10": { + "level20": "l20" + } + }, + "level02": "l02" + } + d1 = { + "level01": { + "level10": { + "level20": "l20_" + } + } + } + d0_copy = copy.deepcopy(d0) + d1_copy = copy.deepcopy(d1) + d0_merge = merge(d0, d1) + self.assertEqual(d0, d0_copy) + self.assertEqual(d1, d1_copy) + self.assertEqual( + d0_merge, + { + "level00": "l00", + "level01": { + "level10": { + "level20": "l20_" + } + }, + "level02": "l02" + } + ) + + if __name__ == "__main__": unittest.main() diff --git a/src/biokbase/narrative/tests/test_jobcomm.py b/src/biokbase/narrative/tests/test_jobcomm.py index c526e1bebf..517d6ccf25 100644 --- a/src/biokbase/narrative/tests/test_jobcomm.py +++ b/src/biokbase/narrative/tests/test_jobcomm.py @@ -2,6 +2,7 @@ import itertools import os import re +import time import unittest from unittest import mock @@ -56,7 +57,10 @@ JOB_TERMINATED, MAX_LOG_LINES, REFRESH_STATE, + TEST_EPOCH_NS, + JC_TIME_NS, generate_error, + get_test_jobs, ) from .narrative_mock.mockclients import ( @@ -98,6 +102,14 @@ LOG_LINES = [{"is_error": 0, "line": f"This is line {i}"} for i in range(MAX_LOG_LINES)] +def ts_are_close(t0: int, t1: int) -> bool: + """ + t0 and t1 are epochs in nanoseconds. + Check that they are within 1s of each other + """ + return abs(t1 - t0) * 1e-9 <= 1 + + def make_comm_msg( msg_type: str, job_id_like, as_job_request: bool, content: dict = None ): @@ -485,7 +497,22 @@ def test_start_job_status_loop__no_jobs_stop_loop(self): # Lookup all job states # --------------------- + def _check_pop_last_checked(self, output_states, last_checked=TEST_EPOCH_NS): + """ + For STATUS responses, each output_state will have an extra field `last_checked` + that is variable and is not in the test data. Check that here and delete before + other checkd + """ + for output_state in output_states.values(): + self.assertIn("last_checked", output_state) + self.assertTrue( + last_checked == output_state["last_checked"] + or ts_are_close(last_checked, output_state["last_checked"]) + ) + del output_state["last_checked"] + @mock.patch(CLIENTS, get_mock_client) + @mock.patch(JC_TIME_NS, lambda: TEST_EPOCH_NS) def check_job_output_states( self, output_states=None, @@ -494,6 +521,7 @@ def check_job_output_states( response_type=STATUS, ok_states=None, error_states=None, + last_checked=TEST_EPOCH_NS, ): """ Handle any request that returns a dictionary of job state objects; this @@ -506,6 +534,7 @@ def check_job_output_states( :param params: params for the comm message (opt) :param ok_states: list of job IDs expected to be in the output :param error_states: list of job IDs expected to return a not found error + :param last_checked: ts in ns """ if not params: params = {} @@ -527,6 +556,11 @@ def check_job_output_states( msg, ) + # for STATUS responses, there will be a field `last_checked` + # that is variable and not in the test data. check that here and remove + if response_type == MESSAGE_TYPE["STATUS"]: + self._check_pop_last_checked(output_states, last_checked) + for job_id, state in output_states.items(): self.assertEqual(ALL_RESPONSE_DATA[STATUS][job_id], state) if job_id in ok_states: @@ -544,12 +578,19 @@ def test_get_all_job_states__ok(self): # ----------------------- # Lookup single job state # ----------------------- + @mock.patch(JC_TIME_NS, lambda: TEST_EPOCH_NS) def test_get_job_state__1_ok(self): output_states = self.jc.get_job_state(JOB_COMPLETED) self.check_job_output_states( output_states=output_states, ok_states=[JOB_COMPLETED] ) + def test_get_job_state__live_ts(self): + output_states = self.jc.get_job_state(JOB_COMPLETED) + self.check_job_output_states( + output_states=output_states, ok_states=[JOB_COMPLETED], last_checked=time.time_ns() + ) + def test_get_job_state__no_job(self): with self.assertRaisesRegex( JobRequestException, re.escape(f"{JOBS_MISSING_ERR}: {[None]}") @@ -612,6 +653,7 @@ def test_get_job_states__batch_id__not_batch(self): self.check_batch_id__not_batch_test(STATUS) @mock.patch(CLIENTS, get_mock_client) + @mock.patch(JC_TIME_NS, lambda: TEST_EPOCH_NS) def test_get_job_states__job_id_list__ee2_error(self): exc = Exception("Test exception") exc_message = str(exc) @@ -626,6 +668,8 @@ def mock_check_jobs(params): self.jc._handle_comm_message(req_dict) msg = self.jc._comm.last_message + self._check_pop_last_checked(msg["content"], TEST_EPOCH_NS) + expected = {job_id: copy.deepcopy(ALL_RESPONSE_DATA[STATUS][job_id]) for job_id in ALL_JOBS} for job_id in ACTIVE_JOBS: # add in the ee2_error message @@ -639,6 +683,65 @@ def mock_check_jobs(params): msg, ) + @mock.patch(CLIENTS, get_mock_client) + def test_get_job_states__last_updated(self): + """ + Copied from test_jobmanager.py + But also tests the last_checked field + """ + # what FE will say was the last time the jobs were checked + ts = time.time_ns() + + # mix of terminal and not terminal + not_updated_ids = [JOB_COMPLETED, JOB_ERROR, JOB_TERMINATED, JOB_CREATED, JOB_RUNNING] + # not terminal + updated_ids = [BATCH_PARENT, BATCH_RETRY_RUNNING] + + # error ids + not_found_ids = [JOB_NOT_FOUND] + + job_ids = not_updated_ids + updated_ids + active_ids = list(set(job_ids) & set(ACTIVE_JOBS)) + + # all job IDs partitioned as + not_found_ids + terminal_ids = list(set(job_ids) - set(ACTIVE_JOBS)) # noqa: F841 + not_updated_active_ids = list(set(not_updated_ids) & set(active_ids)) # noqa: F841 + updated_active_ids = list(set(updated_ids) & set(active_ids)) + + def mock_check_jobs(self_, params): + """Mutate only chosen job states""" + lookup_ids = params["job_ids"] + self.assertCountEqual(active_ids, lookup_ids) # sanity check + + job_states_ret = get_test_jobs(lookup_ids) + for job_id, job_state in job_states_ret.items(): + # if job is chosen to be updated, mutate it + if job_id in updated_active_ids: + job_state["updated"] += 1 + return job_states_ret + + rq = make_comm_msg(STATUS, job_ids + not_found_ids, False, {"ts": ts}) + with mock.patch.object(MockClients, "check_jobs", mock_check_jobs): + output_states = self.jc._handle_comm_message(rq) + + expected = { + job_id: copy.deepcopy(ALL_RESPONSE_DATA[MESSAGE_TYPE["STATUS"]][job_id]) + for job_id in updated_active_ids + } + for job_state in expected.values(): + job_state["jobState"]["updated"] += 1 + expected[JOB_NOT_FOUND] = { + "job_id": JOB_NOT_FOUND, + "error": f"Cannot find job with ID {JOB_NOT_FOUND}" + } + + self._check_pop_last_checked(output_states, ts) + self.assertEqual( + expected, + output_states + ) + # ----------------------- # get cell job states # ----------------------- @@ -840,12 +943,15 @@ def test_cancel_jobs__job_id_list__all_bad_jobs(self): ) @mock.patch(CLIENTS, get_mock_client) + @mock.patch(JC_TIME_NS, lambda: TEST_EPOCH_NS) def test_cancel_jobs__job_id_list__failure(self): # the mock client will throw an error with BATCH_RETRY_RUNNING job_id_list = [JOB_RUNNING, BATCH_RETRY_RUNNING] req_dict = make_comm_msg(CANCEL, job_id_list, False) output = self.jc._handle_comm_message(req_dict) + self._check_pop_last_checked(output) + expected = { JOB_RUNNING: ALL_RESPONSE_DATA[STATUS][JOB_RUNNING], BATCH_RETRY_RUNNING: { diff --git a/src/biokbase/narrative/tests/test_jobmanager.py b/src/biokbase/narrative/tests/test_jobmanager.py index 82550c603a..12893ef4fc 100644 --- a/src/biokbase/narrative/tests/test_jobmanager.py +++ b/src/biokbase/narrative/tests/test_jobmanager.py @@ -2,6 +2,7 @@ import itertools import os import re +import time import unittest from datetime import datetime from unittest import mock @@ -36,6 +37,7 @@ BATCH_CHILDREN, BATCH_ERROR_RETRIED, BATCH_PARENT, + BATCH_RETRY_RUNNING, BATCH_TERMINATED, BATCH_TERMINATED_RETRIED, CLIENTS, @@ -50,6 +52,7 @@ TEST_JOBS, generate_error, get_test_job, + get_test_jobs, ) from .narrative_mock.mockclients import ( @@ -707,6 +710,63 @@ def test_get_job_states__empty(self): ): self.jm.get_job_states([]) + @mock.patch(CLIENTS, get_mock_client) + def test_get_job_states__last_updated(self): + """ + Test that only updated jobs return an actual state + and that the rest of the jobs are removed + """ + # what FE will say was the last time the jobs were checked + ts = time.time_ns() + + # mix of terminal and not terminal + not_updated_ids = [JOB_COMPLETED, JOB_ERROR, JOB_TERMINATED, JOB_CREATED, JOB_RUNNING] + # not terminal + updated_ids = [BATCH_PARENT, BATCH_RETRY_RUNNING] + + # error ids + not_found_ids = [JOB_NOT_FOUND] + + job_ids = not_updated_ids + updated_ids + active_ids = list(set(job_ids) & set(ACTIVE_JOBS)) + + # all job IDs partitioned as + not_found_ids + terminal_ids = list(set(job_ids) - set(ACTIVE_JOBS)) # noqa: F841 + not_updated_active_ids = list(set(not_updated_ids) & set(active_ids)) # noqa: F841 + updated_active_ids = list(set(updated_ids) & set(active_ids)) + + def mock_check_jobs(self_, params): + """Mutate only chosen job states""" + lookup_ids = params["job_ids"] + self.assertCountEqual(active_ids, lookup_ids) # sanity check + + job_states_ret = get_test_jobs(lookup_ids) + for job_id, job_state in job_states_ret.items(): + # if job is chosen to be updated, mutate it + if job_id in updated_active_ids: + job_state["updated"] += 1 + return job_states_ret + + with mock.patch.object(MockClients, "check_jobs", mock_check_jobs): + output_states = self.jm.get_job_states(job_ids + not_found_ids, ts=ts) + + expected = { + job_id: copy.deepcopy(ALL_RESPONSE_DATA[MESSAGE_TYPE["STATUS"]][job_id]) + for job_id in updated_active_ids + } + for job_state in expected.values(): + job_state["jobState"]["updated"] += 1 + expected[JOB_NOT_FOUND] = { + "job_id": JOB_NOT_FOUND, + "error": f"Cannot find job with ID {JOB_NOT_FOUND}" + } + + self.assertEqual( + expected, + output_states + ) + def test_update_batch_job__dne(self): with self.assertRaisesRegex( JobRequestException, f"{JOB_NOT_REG_ERR}: {JOB_NOT_FOUND}" From 96b748eeeab96a4daad2fb8cd3eee6776783b40a Mon Sep 17 00:00:00 2001 From: n1mus <709030+n1mus@users.noreply.github.com> Date: Thu, 12 May 2022 04:00:43 -0700 Subject: [PATCH 04/10] add error state when no updated jobs --- src/biokbase/narrative/jobs/jobcomm.py | 11 +- src/biokbase/narrative/jobs/jobmanager.py | 18 ++- src/biokbase/narrative/tests/test_job_util.py | 58 +++++---- src/biokbase/narrative/tests/test_jobcomm.py | 112 ++++++++++++++++-- .../narrative/tests/test_jobmanager.py | 57 --------- 5 files changed, 163 insertions(+), 93 deletions(-) diff --git a/src/biokbase/narrative/jobs/jobcomm.py b/src/biokbase/narrative/jobs/jobcomm.py index 4bdf35f0d3..d9580a92b7 100644 --- a/src/biokbase/narrative/jobs/jobcomm.py +++ b/src/biokbase/narrative/jobs/jobcomm.py @@ -117,7 +117,11 @@ def cell_id_list(self): @property def ts(self): - """This param is completely optional""" + """ + Optional field sent with STATUS requests indicating to filter out + job states in the STATUS response that have not been updated since + this epoch time (in ns) + """ return self.rq_data.get(PARAM["TS"]) @@ -200,10 +204,7 @@ def _get_job_ids(self, req: JobRequest) -> List[str]: if req.has_batch_id(): return self._jm.update_batch_job(req.batch_id) - try: - return req.job_id_list - except Exception as ex: - raise JobRequestException(ONE_INPUT_TYPE_ONLY_ERR) from ex + return req.job_id_list def start_job_status_loop( self, diff --git a/src/biokbase/narrative/jobs/jobmanager.py b/src/biokbase/narrative/jobs/jobmanager.py index 14e66371b8..57df1cc9eb 100644 --- a/src/biokbase/narrative/jobs/jobmanager.py +++ b/src/biokbase/narrative/jobs/jobmanager.py @@ -1,4 +1,5 @@ import copy +import time from datetime import datetime, timedelta, timezone from typing import List, Tuple @@ -29,13 +30,15 @@ __version__ = "0.0.1" JOB_NOT_REG_ERR = "Job ID is not registered" +JOB_NOT_REG_2_ERR = "Cannot find job with ID %s" # TODO unify these JOB_NOT_BATCH_ERR = "Job ID is not for a batch job" JOBS_TYPE_ERR = "List expected for job_id_list" JOBS_MISSING_ERR = "No valid job IDs provided" CELLS_NOT_PROVIDED_ERR = "cell_id_list not provided" -DOES_NOT_EXIST = "does_not_exist" + +NO_UPDATED_JOBS_ERR = "No updated jobs" class JobManager: @@ -345,8 +348,17 @@ def get_job_states(self, job_ids: List[str], ts: int = None) -> dict: for job_id in job_ids: if self.get_job(job_id).last_updated < ts: del output_states[job_id] + no_updated_jobs = ts is not None and job_ids and not output_states + + # add error_ids first in the unlikely case one of the error_ids + # is "error" which is a reserved key which is prioritized + # for indicating an actual error event + self.add_errors_to_results(output_states, error_ids) - return self.add_errors_to_results(output_states, error_ids) + if no_updated_jobs: + output_states["error"] = {"error": NO_UPDATED_JOBS_ERR} + + return output_states def get_all_job_states(self, ignore_refresh_flag=False) -> dict: """ @@ -725,7 +737,7 @@ def add_errors_to_results(self, results: dict, error_ids: List[str]) -> dict: for error_id in error_ids: results[error_id] = { "job_id": error_id, - "error": f"Cannot find job with ID {error_id}", + "error": JOB_NOT_REG_2_ERR % error_id, } return results diff --git a/src/biokbase/narrative/tests/test_job_util.py b/src/biokbase/narrative/tests/test_job_util.py index 8ba414b704..7f882b7529 100644 --- a/src/biokbase/narrative/tests/test_job_util.py +++ b/src/biokbase/narrative/tests/test_job_util.py @@ -60,7 +60,8 @@ def test_load_job_constants__valid(self): class MergeTest(unittest.TestCase): - def _check_merge_inplace(self, d0: dict, d1: dict, exp_merge: dict): + def _check(self, d0: dict, d1: dict, exp_merge: dict): + d0_copy = copy.deepcopy(d0) d1_copy = copy.deepcopy(d1) merge_inplace(d0, d1) self.assertEqual( @@ -71,11 +72,19 @@ def _check_merge_inplace(self, d0: dict, d1: dict, exp_merge: dict): d1, d1_copy ) + d0 = copy.deepcopy(d0_copy) + dmerge = merge(d0, d1) + self.assertEqual( + dmerge, + exp_merge + ) + self.assertEqual(d0, d0_copy) + self.assertEqual(d1, d1_copy) def test_merge_inplace__empty(self): d0 = {} d1 = {} - self._check_merge_inplace( + self._check( d0, d1, {} @@ -85,7 +94,7 @@ def test_merge_inplace__d0_empty(self): # flat d0 = {} d1 = {"level00": "l00"} - self._check_merge_inplace( + self._check( d0, d1, {"level00": "l00"} @@ -99,7 +108,7 @@ def test_merge_inplace__d0_empty(self): "level10": "l10" } } - self._check_merge_inplace( + self._check( d0, d1, { @@ -114,7 +123,7 @@ def test_merge_inplace__d1_empty(self): # flat d0 = {"level00": "l00"} d1 = {} - self._check_merge_inplace( + self._check( d0, d1, {"level00": "l00"} @@ -128,7 +137,7 @@ def test_merge_inplace__d1_empty(self): } } d1 = {} - self._check_merge_inplace( + self._check( d0, d1, { @@ -148,7 +157,7 @@ def test_merge_inplace__flat(self): "level01": "l01_", "level02": "l02" } - self._check_merge_inplace( + self._check( d0, d1, { @@ -163,7 +172,10 @@ def test_merge_inplace__nested(self): "level00": { "level10": { "level20": "l20", - "level21": "l21" + "level21": "l21", + "level23": { + "level30": "l30" + } } }, "level01": "l01" @@ -171,20 +183,30 @@ def test_merge_inplace__nested(self): d1 = { "level00": { "level10": { - "level22": "l22" + "level21": "l21_", + "level22": "l22", + "level24": { + "level30": "l30" + } } }, "level01": "l01_" } - self._check_merge_inplace( + self._check( d0, d1, { "level00": { "level10": { "level20": "l20", - "level21": "l21", - "level22": "l22" + "level21": "l21_", + "level22": "l22", + "level23": { + "level30": "l30" + }, + "level24": { + "level30": "l30" + } } }, "level01": "l01_" @@ -205,7 +227,7 @@ def test_merge_inplace__xor_dicts(self): ): merge_inplace(d0, d1) - def test_merge(self): + def test_random(self): d0 = { "level00": "l00", "level01": { @@ -222,13 +244,9 @@ def test_merge(self): } } } - d0_copy = copy.deepcopy(d0) - d1_copy = copy.deepcopy(d1) - d0_merge = merge(d0, d1) - self.assertEqual(d0, d0_copy) - self.assertEqual(d1, d1_copy) - self.assertEqual( - d0_merge, + self._check( + d0, + d1, { "level00": "l00", "level01": { diff --git a/src/biokbase/narrative/tests/test_jobcomm.py b/src/biokbase/narrative/tests/test_jobcomm.py index 517d6ccf25..a5249195f6 100644 --- a/src/biokbase/narrative/tests/test_jobcomm.py +++ b/src/biokbase/narrative/tests/test_jobcomm.py @@ -2,6 +2,7 @@ import itertools import os import re +import sys import time import unittest from unittest import mock @@ -25,7 +26,9 @@ from biokbase.narrative.jobs.jobmanager import ( JOB_NOT_BATCH_ERR, JOB_NOT_REG_ERR, + JOB_NOT_REG_2_ERR, JOBS_MISSING_ERR, + NO_UPDATED_JOBS_ERR, JobManager, ) from biokbase.narrative.tests.generate_test_results import ( @@ -501,7 +504,7 @@ def _check_pop_last_checked(self, output_states, last_checked=TEST_EPOCH_NS): """ For STATUS responses, each output_state will have an extra field `last_checked` that is variable and is not in the test data. Check that here and delete before - other checkd + other checks """ for output_state in output_states.values(): self.assertIn("last_checked", output_state) @@ -683,12 +686,36 @@ def mock_check_jobs(params): msg, ) + def _reset_last_updated(self): + """Set last_updated back a minute""" + for job_id in self.jm._running_jobs: + job = self.jm.get_job(job_id) + job.last_updated -= 60 * 1e9 + self.assertTrue(job.last_updated > 0) # sanity check + + def _check_last_updated(self, exp_updated): + """Make sure the right jobs had `last_updated` bumped""" + exp_not_updated = list(set(ALL_JOBS) - set(exp_updated)) # exclusion + now = time.time_ns() + + exp_updated = [ + self.jm.get_job(job_id).last_updated for job_id in exp_updated + ] + for ts in exp_updated: + self.assertTrue(ts_are_close(ts, now)) + + exp_not_updated = [ + self.jm.get_job(job_id).last_updated for job_id in exp_not_updated + ] + for ts in exp_not_updated: + # was long time ago + self.assertTrue(ts < now) + self.assertFalse(ts_are_close(ts, now)) + @mock.patch(CLIENTS, get_mock_client) - def test_get_job_states__last_updated(self): - """ - Copied from test_jobmanager.py - But also tests the last_checked field - """ + def test_get_job_states__by_last_updated(self): + self._reset_last_updated() + # what FE will say was the last time the jobs were checked ts = time.time_ns() @@ -733,14 +760,83 @@ def mock_check_jobs(self_, params): job_state["jobState"]["updated"] += 1 expected[JOB_NOT_FOUND] = { "job_id": JOB_NOT_FOUND, - "error": f"Cannot find job with ID {JOB_NOT_FOUND}" + "error": JOB_NOT_REG_2_ERR % JOB_NOT_FOUND } - self._check_pop_last_checked(output_states, ts) + self._check_pop_last_checked(output_states, time.time_ns()) self.assertEqual( expected, output_states ) + self._check_last_updated(updated_active_ids) + + @mock.patch(CLIENTS, get_mock_client) + def test_get_job_states__all_updated_jobs(self): + """ + If theoretically all the jobs were last checked at the beginning of time, + all job states would be returned + """ + self._reset_last_updated() + + def mock_check_jobs(self_, params): + """Mutate all given job states""" + lookup_ids = params["job_ids"] + self.assertCountEqual(ACTIVE_JOBS, lookup_ids) # sanity check + + job_states_ret = get_test_jobs(lookup_ids) + for _, job_state in job_states_ret.items(): + job_state["updated"] += 1 + return job_states_ret + + rq = make_comm_msg(STATUS, ALL_JOBS + [JOB_NOT_FOUND], False, {"ts": 0}) + with mock.patch.object(MockClients, "check_jobs", mock_check_jobs): + output_states = self.jc._handle_comm_message(rq) + + expected = { + job_id: copy.deepcopy(ALL_RESPONSE_DATA[MESSAGE_TYPE["STATUS"]][job_id]) + for job_id in ALL_JOBS + } + for job_id, job_state in expected.items(): + if job_id in ACTIVE_JOBS: + job_state["jobState"]["updated"] += 1 + expected[JOB_NOT_FOUND] = { + "job_id": JOB_NOT_FOUND, + "error": JOB_NOT_REG_2_ERR % JOB_NOT_FOUND + } + + self._check_pop_last_checked(output_states, time.time_ns()) + self.assertEqual( + expected, + output_states + ) + self._check_last_updated(ACTIVE_JOBS) + + @mock.patch(CLIENTS, get_mock_client) + def test_get_job_states__no_updated_jobs(self): + """ + If theoretically all the jobs were last checked at the end of time, + no job states would be returned, and there would be an error state + to indicate that + """ + self._reset_last_updated() + + rq = make_comm_msg(STATUS, ALL_JOBS + [JOB_NOT_FOUND], False, {"ts": sys.maxsize}) + output_states = self.jc._handle_comm_message(rq) + + self._check_pop_last_checked(output_states, time.time_ns()) + self.assertEqual( + { + JOB_NOT_FOUND: { + "job_id": JOB_NOT_FOUND, + "error": JOB_NOT_REG_2_ERR % JOB_NOT_FOUND + }, + "error": { + "error": NO_UPDATED_JOBS_ERR + } + }, + output_states + ) + self._check_last_updated([]) # ----------------------- # get cell job states diff --git a/src/biokbase/narrative/tests/test_jobmanager.py b/src/biokbase/narrative/tests/test_jobmanager.py index 12893ef4fc..2ac08301e5 100644 --- a/src/biokbase/narrative/tests/test_jobmanager.py +++ b/src/biokbase/narrative/tests/test_jobmanager.py @@ -710,63 +710,6 @@ def test_get_job_states__empty(self): ): self.jm.get_job_states([]) - @mock.patch(CLIENTS, get_mock_client) - def test_get_job_states__last_updated(self): - """ - Test that only updated jobs return an actual state - and that the rest of the jobs are removed - """ - # what FE will say was the last time the jobs were checked - ts = time.time_ns() - - # mix of terminal and not terminal - not_updated_ids = [JOB_COMPLETED, JOB_ERROR, JOB_TERMINATED, JOB_CREATED, JOB_RUNNING] - # not terminal - updated_ids = [BATCH_PARENT, BATCH_RETRY_RUNNING] - - # error ids - not_found_ids = [JOB_NOT_FOUND] - - job_ids = not_updated_ids + updated_ids - active_ids = list(set(job_ids) & set(ACTIVE_JOBS)) - - # all job IDs partitioned as - not_found_ids - terminal_ids = list(set(job_ids) - set(ACTIVE_JOBS)) # noqa: F841 - not_updated_active_ids = list(set(not_updated_ids) & set(active_ids)) # noqa: F841 - updated_active_ids = list(set(updated_ids) & set(active_ids)) - - def mock_check_jobs(self_, params): - """Mutate only chosen job states""" - lookup_ids = params["job_ids"] - self.assertCountEqual(active_ids, lookup_ids) # sanity check - - job_states_ret = get_test_jobs(lookup_ids) - for job_id, job_state in job_states_ret.items(): - # if job is chosen to be updated, mutate it - if job_id in updated_active_ids: - job_state["updated"] += 1 - return job_states_ret - - with mock.patch.object(MockClients, "check_jobs", mock_check_jobs): - output_states = self.jm.get_job_states(job_ids + not_found_ids, ts=ts) - - expected = { - job_id: copy.deepcopy(ALL_RESPONSE_DATA[MESSAGE_TYPE["STATUS"]][job_id]) - for job_id in updated_active_ids - } - for job_state in expected.values(): - job_state["jobState"]["updated"] += 1 - expected[JOB_NOT_FOUND] = { - "job_id": JOB_NOT_FOUND, - "error": f"Cannot find job with ID {JOB_NOT_FOUND}" - } - - self.assertEqual( - expected, - output_states - ) - def test_update_batch_job__dne(self): with self.assertRaisesRegex( JobRequestException, f"{JOB_NOT_REG_ERR}: {JOB_NOT_FOUND}" From 678cc6f8fdf936eb1754da8de9178b9c09b7af83 Mon Sep 17 00:00:00 2001 From: n1mus <709030+n1mus@users.noreply.github.com> Date: Thu, 12 May 2022 22:43:18 -0700 Subject: [PATCH 05/10] set simultaneous last_updated --- src/biokbase/narrative/jobs/job.py | 4 +- src/biokbase/narrative/jobs/jobmanager.py | 8 ++- .../narrative/tests/job_test_constants.py | 7 ++ .../tests/narrative_mock/mockclients.py | 9 ++- src/biokbase/narrative/tests/test_job.py | 64 +++++++++++++++++ src/biokbase/narrative/tests/test_jobcomm.py | 68 ++++++++++++++----- .../narrative/tests/test_jobmanager.py | 3 - 7 files changed, 139 insertions(+), 24 deletions(-) diff --git a/src/biokbase/narrative/jobs/job.py b/src/biokbase/narrative/jobs/job.py index 66d54c2a55..6320ac48ec 100644 --- a/src/biokbase/narrative/jobs/job.py +++ b/src/biokbase/narrative/jobs/job.py @@ -312,7 +312,7 @@ def parameters(self): f"Unable to fetch parameters for job {self.job_id} - {e}" ) - def _update_state(self, state: dict) -> None: + def _update_state(self, state: dict, ts: int = None) -> None: """ Given a state data structure (as emitted by ee2), update the stored state in the job object All updates to the job state should go through here to keep the last_updated field accurate @@ -339,7 +339,7 @@ def _update_state(self, state: dict) -> None: else: merge_inplace(self._acc_state, state) - self.last_updated = time.time_ns() + self.last_updated = time.time_ns() if ts is None else ts def refresh_state(self, force_refresh=False, exclude=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS): """ diff --git a/src/biokbase/narrative/jobs/jobmanager.py b/src/biokbase/narrative/jobs/jobmanager.py index 57df1cc9eb..8b692d3694 100644 --- a/src/biokbase/narrative/jobs/jobmanager.py +++ b/src/biokbase/narrative/jobs/jobmanager.py @@ -310,10 +310,16 @@ def _construct_job_output_state_set( # fill in the output states for the missing jobs # if the job fetch failed, add an error message to the output # and return the cached job state + now = time.time_ns() for job_id in jobs_to_lookup: job = self.get_job(job_id) if job_id in fetched_states: - output_states[job_id] = job.output_state(fetched_states[job_id]) + fetched_state = fetched_states[job_id] + # pre-emptively try a job state update + # so can mark the bolus of fetched (but changed) states + # with a simultaneous timestamp + job._update_state(fetched_state, now) + output_states[job_id] = job.output_state(fetched_state) else: # fetch the current state without updating it output_states[job_id] = job.output_state({}) diff --git a/src/biokbase/narrative/tests/job_test_constants.py b/src/biokbase/narrative/tests/job_test_constants.py index 112c27bcbc..b71531ff9b 100644 --- a/src/biokbase/narrative/tests/job_test_constants.py +++ b/src/biokbase/narrative/tests/job_test_constants.py @@ -32,6 +32,13 @@ def generate_error(job_id, err_type): return error_strings[err_type] +def trim_ee2_state(ee2_state, exclude_fields): + if exclude_fields: + for field in exclude_fields: + if field in ee2_state: + del ee2_state[field] + + def get_test_job(job_id): return copy.deepcopy(TEST_JOBS[job_id]) diff --git a/src/biokbase/narrative/tests/narrative_mock/mockclients.py b/src/biokbase/narrative/tests/narrative_mock/mockclients.py index 019d79308b..c81311f76a 100644 --- a/src/biokbase/narrative/tests/narrative_mock/mockclients.py +++ b/src/biokbase/narrative/tests/narrative_mock/mockclients.py @@ -17,6 +17,7 @@ READS_OBJ_1, READS_OBJ_2, generate_error, + trim_ee2_state, ) from biokbase.narrative.tests.generate_test_results import RETRIED_JOBS @@ -244,7 +245,13 @@ def list_objects(self, params): # ----- Execution Engine (EE2) functions ----- def check_workspace_jobs(self, params): - return self.job_state_data + ee2_states = self.job_state_data + if params.get("exclude_fields"): + for ee2_state in ee2_states.values(): + trim_ee2_state(ee2_state, params["exclude_fields"]) + if params.get("return_list"): + ee2_states = list(ee2_states.values()) + return ee2_states def run_job(self, params): return self.test_job_id diff --git a/src/biokbase/narrative/tests/test_job.py b/src/biokbase/narrative/tests/test_job.py index e71f83090b..0ed95ed5d0 100644 --- a/src/biokbase/narrative/tests/test_job.py +++ b/src/biokbase/narrative/tests/test_job.py @@ -11,6 +11,7 @@ from biokbase.narrative.jobs.job import ( COMPLETED_STATUS, EXCLUDED_JOB_STATE_FIELDS, + OUTPUT_STATE_EXCLUDED_JOB_STATE_FIELDS, JOB_ATTR_DEFAULTS, JOB_ATTRS, Job, @@ -33,6 +34,7 @@ MAX_LOG_LINES, TERMINAL_JOBS, get_test_job, + trim_ee2_state, ) from .narrative_mock.mockclients import ( @@ -406,6 +408,68 @@ def test_job_update__invalid_job_id(self): with self.assertRaisesRegex(ValueError, "Job ID mismatch in _update_state"): job._update_state(get_test_job(JOB_COMPLETED)) + @mock.patch(CLIENTS, get_mock_client) + def test_job_update__last_updated__no_change(self): + for job_id, job in get_all_jobs().items(): + + last_updated = job.last_updated + + # job has full ee2 state + ee2_state = get_test_job(job_id) + job._acc_state = get_test_job(job_id) + + job._update_state(ee2_state) + self.assertEqual(last_updated, job.last_updated) + + trim_ee2_state(ee2_state, JOB_INIT_EXCLUDED_JOB_STATE_FIELDS) + job._update_state(ee2_state) + self.assertEqual(last_updated, job.last_updated) + + trim_ee2_state(ee2_state, EXCLUDED_JOB_STATE_FIELDS) + job._update_state(ee2_state) + self.assertEqual(last_updated, job.last_updated) + + trim_ee2_state(ee2_state, OUTPUT_STATE_EXCLUDED_JOB_STATE_FIELDS) + job._update_state(ee2_state) + self.assertEqual(last_updated, job.last_updated) + + job._update_state({}) + self.assertEqual(last_updated, job.last_updated) + + # job has init ee2 state + ee2_state = get_test_job(job_id) + job._acc_state = get_test_job(job_id) + trim_ee2_state(ee2_state, JOB_INIT_EXCLUDED_JOB_STATE_FIELDS) + trim_ee2_state(job._acc_state, JOB_INIT_EXCLUDED_JOB_STATE_FIELDS) + + job._update_state(ee2_state) + self.assertEqual(last_updated, job.last_updated) + + trim_ee2_state(ee2_state, EXCLUDED_JOB_STATE_FIELDS) + job._update_state(ee2_state) + self.assertEqual(last_updated, job.last_updated) + + trim_ee2_state(ee2_state, OUTPUT_STATE_EXCLUDED_JOB_STATE_FIELDS) + job._update_state(ee2_state) + self.assertEqual(last_updated, job.last_updated) + + job._update_state({}) + self.assertEqual(last_updated, job.last_updated) + + @mock.patch(CLIENTS, get_mock_client) + def test_job_update__last_updated__change(self): + for job_id, job in get_all_jobs().items(): + + last_updated = job.last_updated + + # job has init ee2 state + job._acc_state = get_test_job(job_id) + trim_ee2_state(job._acc_state, JOB_INIT_EXCLUDED_JOB_STATE_FIELDS) + + ee2_state = get_test_job(job_id) + job._update_state(ee2_state) + self.assertTrue(last_updated < job.last_updated) + @mock.patch(CLIENTS, get_mock_client) def test_job_info(self): job = create_job_from_ee2(JOB_COMPLETED) diff --git a/src/biokbase/narrative/tests/test_jobcomm.py b/src/biokbase/narrative/tests/test_jobcomm.py index a5249195f6..32f79377ee 100644 --- a/src/biokbase/narrative/tests/test_jobcomm.py +++ b/src/biokbase/narrative/tests/test_jobcomm.py @@ -63,7 +63,9 @@ TEST_EPOCH_NS, JC_TIME_NS, generate_error, + get_test_job, get_test_jobs, + trim_ee2_state, ) from .narrative_mock.mockclients import ( @@ -687,30 +689,37 @@ def mock_check_jobs(params): ) def _reset_last_updated(self): - """Set last_updated back a minute""" + """Set each last_updated back 3min""" for job_id in self.jm._running_jobs: job = self.jm.get_job(job_id) - job.last_updated -= 60 * 1e9 + job.last_updated -= 180 * 1e9 self.assertTrue(job.last_updated > 0) # sanity check - def _check_last_updated(self, exp_updated): + def _check_last_updated(self, exp_updated_ids): """Make sure the right jobs had `last_updated` bumped""" - exp_not_updated = list(set(ALL_JOBS) - set(exp_updated)) # exclusion + exp_not_updated_ids = list(set(ALL_JOBS) - set(exp_updated_ids)) # exclusion now = time.time_ns() exp_updated = [ - self.jm.get_job(job_id).last_updated for job_id in exp_updated + self.jm.get_job(job_id).last_updated for job_id in exp_updated_ids ] for ts in exp_updated: self.assertTrue(ts_are_close(ts, now)) + # should all be the same + if exp_updated: + self.assertEqual( + len(set(exp_updated)), + 1, + list(zip(exp_updated_ids, exp_updated)) + ) exp_not_updated = [ - self.jm.get_job(job_id).last_updated for job_id in exp_not_updated + self.jm.get_job(job_id).last_updated for job_id in exp_not_updated_ids ] for ts in exp_not_updated: - # was long time ago - self.assertTrue(ts < now) - self.assertFalse(ts_are_close(ts, now)) + # was at least 3min ago + # (i.e., from self._reset_last_updated) + self.assertTrue(ts < now - 180 * 1e9) @mock.patch(CLIENTS, get_mock_client) def test_get_job_states__by_last_updated(self): @@ -736,21 +745,34 @@ def test_get_job_states__by_last_updated(self): not_updated_active_ids = list(set(not_updated_ids) & set(active_ids)) # noqa: F841 updated_active_ids = list(set(updated_ids) & set(active_ids)) + def mock_check_job(self_, params): + """Mutate only chosen job states""" + lookup_id = params["job_id"] + + job_state = get_test_job(lookup_id) + trim_ee2_state(job_state, params.get("exclude_fields")) + if lookup_id in updated_active_ids: + job_state["updated"] += 1 + + return job_state + def mock_check_jobs(self_, params): """Mutate only chosen job states""" lookup_ids = params["job_ids"] self.assertCountEqual(active_ids, lookup_ids) # sanity check - job_states_ret = get_test_jobs(lookup_ids) - for job_id, job_state in job_states_ret.items(): + job_states = get_test_jobs(lookup_ids) + for job_id, job_state in job_states.items(): + trim_ee2_state(job_state, params.get("exclude_fields")) # if job is chosen to be updated, mutate it if job_id in updated_active_ids: job_state["updated"] += 1 - return job_states_ret + return job_states rq = make_comm_msg(STATUS, job_ids + not_found_ids, False, {"ts": ts}) with mock.patch.object(MockClients, "check_jobs", mock_check_jobs): - output_states = self.jc._handle_comm_message(rq) + with mock.patch.object(MockClients, "check_job", mock_check_job): + output_states = self.jc._handle_comm_message(rq) expected = { job_id: copy.deepcopy(ALL_RESPONSE_DATA[MESSAGE_TYPE["STATUS"]][job_id]) @@ -778,19 +800,31 @@ def test_get_job_states__all_updated_jobs(self): """ self._reset_last_updated() + def mock_check_job(self_, params): + """Mutate all given job states""" + lookup_id = params["job_id"] + + job_state = get_test_job(lookup_id) + trim_ee2_state(job_state, params.get("exclude_fields")) + job_state["updated"] += 1 + + return job_state + def mock_check_jobs(self_, params): """Mutate all given job states""" lookup_ids = params["job_ids"] self.assertCountEqual(ACTIVE_JOBS, lookup_ids) # sanity check - job_states_ret = get_test_jobs(lookup_ids) - for _, job_state in job_states_ret.items(): + job_states = get_test_jobs(lookup_ids) + for _, job_state in job_states.items(): + trim_ee2_state(job_state, params.get("exclude_fields")) job_state["updated"] += 1 - return job_states_ret + return job_states rq = make_comm_msg(STATUS, ALL_JOBS + [JOB_NOT_FOUND], False, {"ts": 0}) with mock.patch.object(MockClients, "check_jobs", mock_check_jobs): - output_states = self.jc._handle_comm_message(rq) + with mock.patch.object(MockClients, "check_job", mock_check_job): + output_states = self.jc._handle_comm_message(rq) expected = { job_id: copy.deepcopy(ALL_RESPONSE_DATA[MESSAGE_TYPE["STATUS"]][job_id]) diff --git a/src/biokbase/narrative/tests/test_jobmanager.py b/src/biokbase/narrative/tests/test_jobmanager.py index 2ac08301e5..82550c603a 100644 --- a/src/biokbase/narrative/tests/test_jobmanager.py +++ b/src/biokbase/narrative/tests/test_jobmanager.py @@ -2,7 +2,6 @@ import itertools import os import re -import time import unittest from datetime import datetime from unittest import mock @@ -37,7 +36,6 @@ BATCH_CHILDREN, BATCH_ERROR_RETRIED, BATCH_PARENT, - BATCH_RETRY_RUNNING, BATCH_TERMINATED, BATCH_TERMINATED_RETRIED, CLIENTS, @@ -52,7 +50,6 @@ TEST_JOBS, generate_error, get_test_job, - get_test_jobs, ) from .narrative_mock.mockclients import ( From 0ad40e51fe4d15632d7dc6429a0866bc8be99a6b Mon Sep 17 00:00:00 2001 From: n1mus <709030+n1mus@users.noreply.github.com> Date: Tue, 17 May 2022 20:22:07 -0700 Subject: [PATCH 06/10] simulate time.time_ns(), which is python3.7+ --- src/biokbase/narrative/jobs/job.py | 4 ++-- src/biokbase/narrative/jobs/jobcomm.py | 5 ++--- src/biokbase/narrative/jobs/jobmanager.py | 4 ++-- src/biokbase/narrative/jobs/util.py | 7 +++++++ src/biokbase/narrative/tests/job_test_constants.py | 2 +- src/biokbase/narrative/tests/test_jobcomm.py | 14 +++++++------- 6 files changed, 21 insertions(+), 15 deletions(-) diff --git a/src/biokbase/narrative/jobs/job.py b/src/biokbase/narrative/jobs/job.py index 6320ac48ec..06e1c4ddf1 100644 --- a/src/biokbase/narrative/jobs/job.py +++ b/src/biokbase/narrative/jobs/job.py @@ -9,7 +9,7 @@ import biokbase.narrative.clients as clients from biokbase.narrative.app_util import map_inputs_from_job, map_outputs_from_state from biokbase.narrative.exception_util import transform_job_exception -from biokbase.narrative.jobs.util import merge, merge_inplace +from biokbase.narrative.jobs.util import time_ns, merge, merge_inplace from .specmanager import SpecManager @@ -339,7 +339,7 @@ def _update_state(self, state: dict, ts: int = None) -> None: else: merge_inplace(self._acc_state, state) - self.last_updated = time.time_ns() if ts is None else ts + self.last_updated = time_ns() if ts is None else ts def refresh_state(self, force_refresh=False, exclude=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS): """ diff --git a/src/biokbase/narrative/jobs/jobcomm.py b/src/biokbase/narrative/jobs/jobcomm.py index d9580a92b7..937493556a 100644 --- a/src/biokbase/narrative/jobs/jobcomm.py +++ b/src/biokbase/narrative/jobs/jobcomm.py @@ -1,6 +1,5 @@ import copy import threading -import time from typing import List, Union from ipykernel.comm import Comm @@ -8,7 +7,7 @@ from biokbase.narrative.common import kblogging from biokbase.narrative.exception_util import JobRequestException, NarrativeException from biokbase.narrative.jobs.jobmanager import JobManager -from biokbase.narrative.jobs.util import load_job_constants +from biokbase.narrative.jobs.util import load_job_constants, time_ns (PARAM, MESSAGE_TYPE) = load_job_constants() @@ -520,7 +519,7 @@ def send_comm_message(self, msg_type: str, content: dict) -> None: # to each output_state. Note: error states will have # the last_checked field too if msg_type == MESSAGE_TYPE["STATUS"]: - now = time.time_ns() + now = time_ns() for output_state in content.values(): output_state["last_checked"] = now diff --git a/src/biokbase/narrative/jobs/jobmanager.py b/src/biokbase/narrative/jobs/jobmanager.py index 8b692d3694..ff2c39903e 100644 --- a/src/biokbase/narrative/jobs/jobmanager.py +++ b/src/biokbase/narrative/jobs/jobmanager.py @@ -1,5 +1,4 @@ import copy -import time from datetime import datetime, timedelta, timezone from typing import List, Tuple @@ -15,6 +14,7 @@ ) from .job import JOB_INIT_EXCLUDED_JOB_STATE_FIELDS, Job +from .util import time_ns """ KBase Job Manager @@ -310,7 +310,7 @@ def _construct_job_output_state_set( # fill in the output states for the missing jobs # if the job fetch failed, add an error message to the output # and return the cached job state - now = time.time_ns() + now = time_ns() for job_id in jobs_to_lookup: job = self.get_job(job_id) if job_id in fetched_states: diff --git a/src/biokbase/narrative/jobs/util.py b/src/biokbase/narrative/jobs/util.py index 27aa4b525a..5852451f64 100644 --- a/src/biokbase/narrative/jobs/util.py +++ b/src/biokbase/narrative/jobs/util.py @@ -1,6 +1,8 @@ import copy import json import os +import time + JOB_CONFIG_FILE_PATH_PARTS = [ "kbase-extension", @@ -61,6 +63,11 @@ def load_job_constants(relative_path_to_file=JOB_CONFIG_FILE_PATH_PARTS): return (config["params"], config["message_types"]) +def time_ns(): + """Simulate time.time_ns() which is only available in python 3.7+""" + return int(time.time() * 1e9) + + def merge(d0: dict, d1: dict): d0 = copy.deepcopy(d0) merge_inplace(d0, d1) diff --git a/src/biokbase/narrative/tests/job_test_constants.py b/src/biokbase/narrative/tests/job_test_constants.py index b71531ff9b..cfd6ab270f 100644 --- a/src/biokbase/narrative/tests/job_test_constants.py +++ b/src/biokbase/narrative/tests/job_test_constants.py @@ -48,7 +48,7 @@ def get_test_jobs(job_ids): CLIENTS = "biokbase.narrative.clients.get" -JC_TIME_NS = "biokbase.narrative.jobs.jobcomm.time.time_ns" +JC_TIME_NS = "biokbase.narrative.jobs.jobcomm.time_ns" TEST_EPOCH_NS = 42 # arbitrary epoch ns MAX_LOG_LINES = 10 diff --git a/src/biokbase/narrative/tests/test_jobcomm.py b/src/biokbase/narrative/tests/test_jobcomm.py index 32f79377ee..7b6132c0a5 100644 --- a/src/biokbase/narrative/tests/test_jobcomm.py +++ b/src/biokbase/narrative/tests/test_jobcomm.py @@ -3,7 +3,6 @@ import os import re import sys -import time import unittest from unittest import mock @@ -31,6 +30,7 @@ NO_UPDATED_JOBS_ERR, JobManager, ) +from biokbase.narrative.jobs.util import time_ns from biokbase.narrative.tests.generate_test_results import ( ALL_RESPONSE_DATA, JOBS_BY_CELL_ID, @@ -593,7 +593,7 @@ def test_get_job_state__1_ok(self): def test_get_job_state__live_ts(self): output_states = self.jc.get_job_state(JOB_COMPLETED) self.check_job_output_states( - output_states=output_states, ok_states=[JOB_COMPLETED], last_checked=time.time_ns() + output_states=output_states, ok_states=[JOB_COMPLETED], last_checked=time_ns() ) def test_get_job_state__no_job(self): @@ -698,7 +698,7 @@ def _reset_last_updated(self): def _check_last_updated(self, exp_updated_ids): """Make sure the right jobs had `last_updated` bumped""" exp_not_updated_ids = list(set(ALL_JOBS) - set(exp_updated_ids)) # exclusion - now = time.time_ns() + now = time_ns() exp_updated = [ self.jm.get_job(job_id).last_updated for job_id in exp_updated_ids @@ -726,7 +726,7 @@ def test_get_job_states__by_last_updated(self): self._reset_last_updated() # what FE will say was the last time the jobs were checked - ts = time.time_ns() + ts = time_ns() # mix of terminal and not terminal not_updated_ids = [JOB_COMPLETED, JOB_ERROR, JOB_TERMINATED, JOB_CREATED, JOB_RUNNING] @@ -785,7 +785,7 @@ def mock_check_jobs(self_, params): "error": JOB_NOT_REG_2_ERR % JOB_NOT_FOUND } - self._check_pop_last_checked(output_states, time.time_ns()) + self._check_pop_last_checked(output_states, time_ns()) self.assertEqual( expected, output_states @@ -838,7 +838,7 @@ def mock_check_jobs(self_, params): "error": JOB_NOT_REG_2_ERR % JOB_NOT_FOUND } - self._check_pop_last_checked(output_states, time.time_ns()) + self._check_pop_last_checked(output_states, time_ns()) self.assertEqual( expected, output_states @@ -857,7 +857,7 @@ def test_get_job_states__no_updated_jobs(self): rq = make_comm_msg(STATUS, ALL_JOBS + [JOB_NOT_FOUND], False, {"ts": sys.maxsize}) output_states = self.jc._handle_comm_message(rq) - self._check_pop_last_checked(output_states, time.time_ns()) + self._check_pop_last_checked(output_states, time_ns()) self.assertEqual( { JOB_NOT_FOUND: { From a995eb2f9a7c5b8fcc42fbbfcbde2a66043129fb Mon Sep 17 00:00:00 2001 From: n1mus <709030+n1mus@users.noreply.github.com> Date: Tue, 17 May 2022 20:42:02 -0700 Subject: [PATCH 07/10] improve comments --- src/biokbase/narrative/jobs/jobmanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/biokbase/narrative/jobs/jobmanager.py b/src/biokbase/narrative/jobs/jobmanager.py index ff2c39903e..1dae531d03 100644 --- a/src/biokbase/narrative/jobs/jobmanager.py +++ b/src/biokbase/narrative/jobs/jobmanager.py @@ -316,7 +316,7 @@ def _construct_job_output_state_set( if job_id in fetched_states: fetched_state = fetched_states[job_id] # pre-emptively try a job state update - # so can mark the bolus of fetched (but changed) states + # so can mark the set of fetched (but also changed) states # with a simultaneous timestamp job._update_state(fetched_state, now) output_states[job_id] = job.output_state(fetched_state) From fcf4be8d8bab5bea1cd5dd9572f23592cf5365ea Mon Sep 17 00:00:00 2001 From: n1mus <709030+n1mus@users.noreply.github.com> Date: Thu, 19 May 2022 10:44:32 -0700 Subject: [PATCH 08/10] revert using recursive job state merge --- src/biokbase/narrative/jobs/job.py | 12 +- src/biokbase/narrative/jobs/util.py | 28 --- src/biokbase/narrative/tests/test_job.py | 4 +- src/biokbase/narrative/tests/test_job_util.py | 204 +----------------- 4 files changed, 7 insertions(+), 241 deletions(-) diff --git a/src/biokbase/narrative/jobs/job.py b/src/biokbase/narrative/jobs/job.py index 06e1c4ddf1..dfd76ac891 100644 --- a/src/biokbase/narrative/jobs/job.py +++ b/src/biokbase/narrative/jobs/job.py @@ -9,7 +9,7 @@ import biokbase.narrative.clients as clients from biokbase.narrative.app_util import map_inputs_from_job, map_outputs_from_state from biokbase.narrative.exception_util import transform_job_exception -from biokbase.narrative.jobs.util import time_ns, merge, merge_inplace +from biokbase.narrative.jobs.util import time_ns from .specmanager import SpecManager @@ -240,12 +240,8 @@ def __getattr__(self, name): return attr[name]() def __setattr__(self, name, value): - if name in STATE_ATTRS: + if name in STATE_ATTRS: # TODO are/should these assignments be used? self._update_state({name: value}) - elif name in JOB_INPUT_ATTRS: - self._update_state({"job_input": {name: value}}) - elif name in NARR_CELL_INFO_ATTRS: - self._update_state({"job_input": {"narrative_cell_info": {name: value}}}) else: object.__setattr__(self, name, value) @@ -330,14 +326,14 @@ def _update_state(self, state: dict, ts: int = None) -> None: # Check if there would be no change in updating # i.e., if state <= self._acc_state if self._acc_state is not None: - if merge(self._acc_state, state) == self._acc_state: + if {**self._acc_state, **state} == self._acc_state: return state = copy.deepcopy(state) if self._acc_state is None: self._acc_state = state else: - merge_inplace(self._acc_state, state) + self._acc_state = {**self._acc_state, **state} self.last_updated = time_ns() if ts is None else ts diff --git a/src/biokbase/narrative/jobs/util.py b/src/biokbase/narrative/jobs/util.py index 5852451f64..af8674b91e 100644 --- a/src/biokbase/narrative/jobs/util.py +++ b/src/biokbase/narrative/jobs/util.py @@ -1,4 +1,3 @@ -import copy import json import os import time @@ -66,30 +65,3 @@ def load_job_constants(relative_path_to_file=JOB_CONFIG_FILE_PATH_PARTS): def time_ns(): """Simulate time.time_ns() which is only available in python 3.7+""" return int(time.time() * 1e9) - - -def merge(d0: dict, d1: dict): - d0 = copy.deepcopy(d0) - merge_inplace(d0, d1) - return d0 - - -def merge_inplace(d0: dict, d1: dict): - """ - Recursively merge nested dicts d1 into d0, - overwriting any values in d0 that are not nested dicts. - Mutates d0 - """ - for k, v1 in d1.items(): - if k in d0: - v0 = d0[k] - is_dict_0 = isinstance(v0, dict) - is_dict_1 = isinstance(v1, dict) - if is_dict_0 ^ is_dict_1: - raise ValueError(f"For key {k}: is_dict(v0) xor is_dict(v1)") - elif not is_dict_0 and not is_dict_1: - d0[k] = v1 - elif is_dict_0 and is_dict_1: - merge_inplace(v0, v1) - else: - d0[k] = v1 diff --git a/src/biokbase/narrative/tests/test_job.py b/src/biokbase/narrative/tests/test_job.py index 0ed95ed5d0..908e8977b9 100644 --- a/src/biokbase/narrative/tests/test_job.py +++ b/src/biokbase/narrative/tests/test_job.py @@ -821,7 +821,7 @@ def test_in_cells__batch__same_cell(self): batch_job, child_jobs = batch_fam[0], batch_fam[1:] for job in child_jobs: - job.cell_id = "hello" + job._acc_state["job_input"]["narrative_cell_info"]["cell_id"] = "hello" self.assertTrue(batch_job.in_cells(["hi", "hello"])) @@ -833,7 +833,7 @@ def test_in_cells__batch__diff_cells(self): children_cell_ids = ["hi", "hello", "greetings"] for job, cell_id in zip(child_jobs, itertools.cycle(children_cell_ids)): - job.cell_id = cell_id + job._acc_state["job_input"]["narrative_cell_info"]["cell_id"] = cell_id for cell_id in children_cell_ids: self.assertTrue(batch_job.in_cells([cell_id])) diff --git a/src/biokbase/narrative/tests/test_job_util.py b/src/biokbase/narrative/tests/test_job_util.py index 7f882b7529..656d86f160 100644 --- a/src/biokbase/narrative/tests/test_job_util.py +++ b/src/biokbase/narrative/tests/test_job_util.py @@ -1,8 +1,6 @@ -import copy -import re import unittest -from biokbase.narrative.jobs.util import load_job_constants, merge, merge_inplace +from biokbase.narrative.jobs.util import load_job_constants class JobUtilTestCase(unittest.TestCase): @@ -59,205 +57,5 @@ def test_load_job_constants__valid(self): self.assertIn(item, message_types) -class MergeTest(unittest.TestCase): - def _check(self, d0: dict, d1: dict, exp_merge: dict): - d0_copy = copy.deepcopy(d0) - d1_copy = copy.deepcopy(d1) - merge_inplace(d0, d1) - self.assertEqual( - d0, - exp_merge - ) - self.assertEqual( - d1, - d1_copy - ) - d0 = copy.deepcopy(d0_copy) - dmerge = merge(d0, d1) - self.assertEqual( - dmerge, - exp_merge - ) - self.assertEqual(d0, d0_copy) - self.assertEqual(d1, d1_copy) - - def test_merge_inplace__empty(self): - d0 = {} - d1 = {} - self._check( - d0, - d1, - {} - ) - - def test_merge_inplace__d0_empty(self): - # flat - d0 = {} - d1 = {"level00": "l00"} - self._check( - d0, - d1, - {"level00": "l00"} - ) - - # nested - d0 = {} - d1 = { - "level00": "l00", - "level01": { - "level10": "l10" - } - } - self._check( - d0, - d1, - { - "level00": "l00", - "level01": { - "level10": "l10" - } - } - ) - - def test_merge_inplace__d1_empty(self): - # flat - d0 = {"level00": "l00"} - d1 = {} - self._check( - d0, - d1, - {"level00": "l00"} - ) - - # nested - d0 = { - "level00": "l00", - "level01": { - "level10": "l10" - } - } - d1 = {} - self._check( - d0, - d1, - { - "level00": "l00", - "level01": { - "level10": "l10" - } - } - ) - - def test_merge_inplace__flat(self): - d0 = { - "level00": "l00", - "level01": "l01" - } - d1 = { - "level01": "l01_", - "level02": "l02" - } - self._check( - d0, - d1, - { - "level00": "l00", - "level01": "l01_", - "level02": "l02" - } - ) - - def test_merge_inplace__nested(self): - d0 = { - "level00": { - "level10": { - "level20": "l20", - "level21": "l21", - "level23": { - "level30": "l30" - } - } - }, - "level01": "l01" - } - d1 = { - "level00": { - "level10": { - "level21": "l21_", - "level22": "l22", - "level24": { - "level30": "l30" - } - } - }, - "level01": "l01_" - } - self._check( - d0, - d1, - { - "level00": { - "level10": { - "level20": "l20", - "level21": "l21_", - "level22": "l22", - "level23": { - "level30": "l30" - }, - "level24": { - "level30": "l30" - } - } - }, - "level01": "l01_" - } - ) - - def test_merge_inplace__xor_dicts(self): - d0 = { - "level00": {} - } - d1 = { - "level00": "l00", - "level01": "l01" - } - with self.assertRaisesRegex( - ValueError, - re.escape("For key level00: is_dict(v0) xor is_dict(v1)") - ): - merge_inplace(d0, d1) - - def test_random(self): - d0 = { - "level00": "l00", - "level01": { - "level10": { - "level20": "l20" - } - }, - "level02": "l02" - } - d1 = { - "level01": { - "level10": { - "level20": "l20_" - } - } - } - self._check( - d0, - d1, - { - "level00": "l00", - "level01": { - "level10": { - "level20": "l20_" - } - }, - "level02": "l02" - } - ) - - if __name__ == "__main__": unittest.main() From 9020680f3834d4c618f6de874ac0c35092b68a16 Mon Sep 17 00:00:00 2001 From: n1mus <709030+n1mus@users.noreply.github.com> Date: Thu, 19 May 2022 11:12:27 -0700 Subject: [PATCH 09/10] rename funcs/params --- src/biokbase/narrative/jobs/job.py | 36 +++++++++++------------ src/biokbase/narrative/jobs/jobmanager.py | 2 +- src/biokbase/narrative/tests/test_job.py | 28 +++++++++--------- 3 files changed, 33 insertions(+), 33 deletions(-) diff --git a/src/biokbase/narrative/jobs/job.py b/src/biokbase/narrative/jobs/job.py index dfd76ac891..2dd1a308d7 100644 --- a/src/biokbase/narrative/jobs/job.py +++ b/src/biokbase/narrative/jobs/job.py @@ -104,7 +104,7 @@ def __init__(self, ee2_state, extra_data=None, children=None): if ee2_state.get("job_id") is None: raise ValueError("Cannot create a job without a job ID!") - self._update_state(ee2_state) + self.update_state(ee2_state) self.extra_data = extra_data # verify parent-children relationship @@ -130,9 +130,9 @@ def from_job_ids(cls, job_ids, return_list=True): return jobs @staticmethod - def _trim_ee2_state(state: dict, exclude: list) -> None: - if exclude: - for field in exclude: + def _trim_ee2_state(state: dict, exclude_fields: list) -> None: + if exclude_fields: + for field in exclude_fields: if field in state: del state[field] @@ -241,7 +241,7 @@ def __getattr__(self, name): def __setattr__(self, name, value): if name in STATE_ATTRS: # TODO are/should these assignments be used? - self._update_state({name: value}) + self.update_state({name: value}) else: object.__setattr__(self, name, value) @@ -308,7 +308,7 @@ def parameters(self): f"Unable to fetch parameters for job {self.job_id} - {e}" ) - def _update_state(self, state: dict, ts: int = None) -> None: + def update_state(self, state: dict, ts: int = None) -> None: """ Given a state data structure (as emitted by ee2), update the stored state in the job object All updates to the job state should go through here to keep the last_updated field accurate @@ -320,7 +320,7 @@ def _update_state(self, state: dict, ts: int = None) -> None: if self._acc_state: if "job_id" in state and state["job_id"] != self.job_id: raise ValueError( - f"Job ID mismatch in _update_state: job ID: {self.job_id}; state ID: {state['job_id']}" + f"Job ID mismatch in update_state: job ID: {self.job_id}; state ID: {state['job_id']}" ) # Check if there would be no change in updating @@ -337,28 +337,28 @@ def _update_state(self, state: dict, ts: int = None) -> None: self.last_updated = time_ns() if ts is None else ts - def refresh_state(self, force_refresh=False, exclude=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS): + def refresh_state(self, force_refresh=False, exclude_fields=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS): """ Queries the job service to see the state of the current job. """ if force_refresh or not self.was_terminal(): state = self.query_ee2_state(self.job_id, init=False) - self._update_state(state) + self.update_state(state) - return self.current_state(exclude) + return self.cached_state(exclude_fields) - def current_state(self, exclude=None): + def cached_state(self, exclude_fields=None): """Wrapper for self._acc_state""" state = copy.deepcopy(self._acc_state) - self._trim_ee2_state(state, exclude) + self._trim_ee2_state(state, exclude_fields) return state def output_state(self, state=None, no_refresh=False) -> dict: """ :param state: Supplied when the state is queried beforehand from EE2 in bulk, or when it is retrieved from a cache. If not supplied, must be - queried with self.refresh_state() or self.current_state() + queried with self.refresh_state() or self.cached_state() :return: dict, with structure { @@ -411,10 +411,10 @@ def output_state(self, state=None, no_refresh=False) -> dict: :rtype: dict """ if not state: - state = self.current_state() if no_refresh else self.refresh_state() + state = self.cached_state() if no_refresh else self.refresh_state() else: - self._update_state(state) - state = self.current_state() + self.update_state(state) + state = self.cached_state() if state is None: return self._create_error_state( @@ -464,8 +464,8 @@ def show_output_widget(self, state=None): if not state: state = self.refresh_state() else: - self._update_state(state) - state = self.current_state() + self.update_state(state) + state = self.cached_state() if state["status"] == COMPLETED_STATUS and "job_output" in state: (output_widget, widget_params) = self._get_output_info(state) diff --git a/src/biokbase/narrative/jobs/jobmanager.py b/src/biokbase/narrative/jobs/jobmanager.py index 1dae531d03..6a1bffba93 100644 --- a/src/biokbase/narrative/jobs/jobmanager.py +++ b/src/biokbase/narrative/jobs/jobmanager.py @@ -318,7 +318,7 @@ def _construct_job_output_state_set( # pre-emptively try a job state update # so can mark the set of fetched (but also changed) states # with a simultaneous timestamp - job._update_state(fetched_state, now) + job.update_state(fetched_state, now) output_states[job_id] = job.output_state(fetched_state) else: # fetch the current state without updating it diff --git a/src/biokbase/narrative/tests/test_job.py b/src/biokbase/narrative/tests/test_job.py index 908e8977b9..966148f7a1 100644 --- a/src/biokbase/narrative/tests/test_job.py +++ b/src/biokbase/narrative/tests/test_job.py @@ -389,10 +389,10 @@ def test_job_update__no_state(self): # should fail with error 'state must be a dict' with self.assertRaisesRegex(TypeError, "state must be a dict"): - job._update_state(None) + job.update_state(None) self.assertFalse(job.was_terminal()) - job._update_state({}) + job.update_state({}) self.assertFalse(job.was_terminal()) @mock.patch(CLIENTS, get_mock_client) @@ -405,8 +405,8 @@ def test_job_update__invalid_job_id(self): self.assertEqual(job.refresh_state(), expected) # try to update it with the job state from a different job - with self.assertRaisesRegex(ValueError, "Job ID mismatch in _update_state"): - job._update_state(get_test_job(JOB_COMPLETED)) + with self.assertRaisesRegex(ValueError, "Job ID mismatch in update_state"): + job.update_state(get_test_job(JOB_COMPLETED)) @mock.patch(CLIENTS, get_mock_client) def test_job_update__last_updated__no_change(self): @@ -418,22 +418,22 @@ def test_job_update__last_updated__no_change(self): ee2_state = get_test_job(job_id) job._acc_state = get_test_job(job_id) - job._update_state(ee2_state) + job.update_state(ee2_state) self.assertEqual(last_updated, job.last_updated) trim_ee2_state(ee2_state, JOB_INIT_EXCLUDED_JOB_STATE_FIELDS) - job._update_state(ee2_state) + job.update_state(ee2_state) self.assertEqual(last_updated, job.last_updated) trim_ee2_state(ee2_state, EXCLUDED_JOB_STATE_FIELDS) - job._update_state(ee2_state) + job.update_state(ee2_state) self.assertEqual(last_updated, job.last_updated) trim_ee2_state(ee2_state, OUTPUT_STATE_EXCLUDED_JOB_STATE_FIELDS) - job._update_state(ee2_state) + job.update_state(ee2_state) self.assertEqual(last_updated, job.last_updated) - job._update_state({}) + job.update_state({}) self.assertEqual(last_updated, job.last_updated) # job has init ee2 state @@ -442,18 +442,18 @@ def test_job_update__last_updated__no_change(self): trim_ee2_state(ee2_state, JOB_INIT_EXCLUDED_JOB_STATE_FIELDS) trim_ee2_state(job._acc_state, JOB_INIT_EXCLUDED_JOB_STATE_FIELDS) - job._update_state(ee2_state) + job.update_state(ee2_state) self.assertEqual(last_updated, job.last_updated) trim_ee2_state(ee2_state, EXCLUDED_JOB_STATE_FIELDS) - job._update_state(ee2_state) + job.update_state(ee2_state) self.assertEqual(last_updated, job.last_updated) trim_ee2_state(ee2_state, OUTPUT_STATE_EXCLUDED_JOB_STATE_FIELDS) - job._update_state(ee2_state) + job.update_state(ee2_state) self.assertEqual(last_updated, job.last_updated) - job._update_state({}) + job.update_state({}) self.assertEqual(last_updated, job.last_updated) @mock.patch(CLIENTS, get_mock_client) @@ -467,7 +467,7 @@ def test_job_update__last_updated__change(self): trim_ee2_state(job._acc_state, JOB_INIT_EXCLUDED_JOB_STATE_FIELDS) ee2_state = get_test_job(job_id) - job._update_state(ee2_state) + job.update_state(ee2_state) self.assertTrue(last_updated < job.last_updated) @mock.patch(CLIENTS, get_mock_client) From 6bbd36d64b0ee562d74b6c016f7c3bcf98eb4a81 Mon Sep 17 00:00:00 2001 From: n1mus <709030+n1mus@users.noreply.github.com> Date: Thu, 19 May 2022 13:50:47 -0700 Subject: [PATCH 10/10] rebase develop and update docs --- docs/design/job_architecture.md | 18 +++++++++++++----- src/biokbase/narrative/tests/test_jobcomm.py | 12 ++++++------ 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/docs/design/job_architecture.md b/docs/design/job_architecture.md index ae41d08abc..9e4901d441 100644 --- a/docs/design/job_architecture.md +++ b/docs/design/job_architecture.md @@ -315,10 +315,16 @@ Bundled job status information for one or more jobs, keyed by job ID, with the f * `job_id` * `jobState` - see [Job output state](#job-output-state) below for the detailed structure * `outputWidgetInfo` - the parameters to send to output widgets, generated from the app specifications and job output. This is only available for completed jobs and is set to null otherwise. + * `last_checked` - timestamp in ns added at comm sending time -In case of error, the response has instead the keys: +In case of per-input-job-ID error, the response has instead the keys: * `job_id` * `error` - brief message explaining the issue + * `last_checked` - timestamp in ns added at comm sending time + +In case that all valid jobs were filtered out by the `ts` input from the frontend, there will be a job-ID-to-job-state pattern breaking key-value pair keyed by "error" with a state keyed by: + * `error` - "No updated jobs" + * `last_checked` - timestamp in ns added at comm sending time Sample response JSON: ```js @@ -330,11 +336,13 @@ Sample response JSON: "status": "running", "created": 123456789, }, - "outputWidgetInfo": null, // only available for completed jobs + "outputWidgetInfo": null, // only available for completed jobs, + "last_checked": 1652992287210343298, }, "job_id_2": { "job_id": "job_id_2", - "error": "Cannot find job with ID job_id_2" + "error": "Cannot find job with ID job_id_2", + "last_checked": 1652992287210343298, }, } ``` @@ -358,7 +366,6 @@ As sent to browser, includes cell info and run info. The structure below indicat "created": epoch ms, "queued": optional - epoch ms, "finished": optional - epoch ms, - "updated": epoch ms, "terminated_code": optional - int, "error": { // optional "code": int, @@ -371,7 +378,8 @@ As sent to browser, includes cell info and run info. The structure below indicat "tag": string (release, beta, dev), "error_code": optional - int, "errormsg": optional - string, - } + }, + "last_checked": int - ns } ``` diff --git a/src/biokbase/narrative/tests/test_jobcomm.py b/src/biokbase/narrative/tests/test_jobcomm.py index 7b6132c0a5..5c00181935 100644 --- a/src/biokbase/narrative/tests/test_jobcomm.py +++ b/src/biokbase/narrative/tests/test_jobcomm.py @@ -752,7 +752,7 @@ def mock_check_job(self_, params): job_state = get_test_job(lookup_id) trim_ee2_state(job_state, params.get("exclude_fields")) if lookup_id in updated_active_ids: - job_state["updated"] += 1 + job_state["created"] += 1 return job_state @@ -766,7 +766,7 @@ def mock_check_jobs(self_, params): trim_ee2_state(job_state, params.get("exclude_fields")) # if job is chosen to be updated, mutate it if job_id in updated_active_ids: - job_state["updated"] += 1 + job_state["created"] += 1 return job_states rq = make_comm_msg(STATUS, job_ids + not_found_ids, False, {"ts": ts}) @@ -779,7 +779,7 @@ def mock_check_jobs(self_, params): for job_id in updated_active_ids } for job_state in expected.values(): - job_state["jobState"]["updated"] += 1 + job_state["jobState"]["created"] += 1 expected[JOB_NOT_FOUND] = { "job_id": JOB_NOT_FOUND, "error": JOB_NOT_REG_2_ERR % JOB_NOT_FOUND @@ -806,7 +806,7 @@ def mock_check_job(self_, params): job_state = get_test_job(lookup_id) trim_ee2_state(job_state, params.get("exclude_fields")) - job_state["updated"] += 1 + job_state["created"] += 1 return job_state @@ -818,7 +818,7 @@ def mock_check_jobs(self_, params): job_states = get_test_jobs(lookup_ids) for _, job_state in job_states.items(): trim_ee2_state(job_state, params.get("exclude_fields")) - job_state["updated"] += 1 + job_state["created"] += 1 return job_states rq = make_comm_msg(STATUS, ALL_JOBS + [JOB_NOT_FOUND], False, {"ts": 0}) @@ -832,7 +832,7 @@ def mock_check_jobs(self_, params): } for job_id, job_state in expected.items(): if job_id in ACTIVE_JOBS: - job_state["jobState"]["updated"] += 1 + job_state["jobState"]["created"] += 1 expected[JOB_NOT_FOUND] = { "job_id": JOB_NOT_FOUND, "error": JOB_NOT_REG_2_ERR % JOB_NOT_FOUND