diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 178a4a93e..ed59f664b 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,5 +1,9 @@ # execution_engine2 (ee2) release notes ========================================= +## 0.0.3.3 + * Change log appends from update to $push + * Change first log linepos to position 0 instead of 1 + ## 0.0.3.2 * Change 2 db state updates for startjob into 1 * Rework Perms Model diff --git a/lib/execution_engine2/db/MongoUtil.py b/lib/execution_engine2/db/MongoUtil.py index 8c355b7db..61fe0137b 100644 --- a/lib/execution_engine2/db/MongoUtil.py +++ b/lib/execution_engine2/db/MongoUtil.py @@ -89,13 +89,13 @@ def _start_local_service(self): @classmethod def _get_collection( - self, - mongo_host: str, - mongo_port: int, - mongo_database: str, - mongo_user: str = None, - mongo_password: str = None, - mongo_authmechanism: str = "DEFAULT", + self, + mongo_host: str, + mongo_port: int, + mongo_database: str, + mongo_user: str = None, + mongo_password: str = None, + mongo_authmechanism: str = "DEFAULT", ): """ Connect to Mongo server and return a tuple with the MongoClient and MongoClient? @@ -232,8 +232,8 @@ def get_jobs(self, job_ids=None, exclude_fields=None, sort_id_ascending=None): raise ValueError("Please input a list type exclude_fields") jobs = ( Job.objects(id__in=job_ids) - .exclude(*exclude_fields) - .order_by("{}_id".format(sort_id_indicator)) + .exclude(*exclude_fields) + .order_by("{}_id".format(sort_id_indicator)) ) else: @@ -417,12 +417,6 @@ def update_job_status(self, job_id, status, msg=None, error_message=None): j.save() - def get_empty_job_log(self): - jl = JobLog() - jl.stored_line_count = 0 - jl.original_line_count = 0 - return jl - @contextmanager def mongo_engine_connection(self): yield self.me_connection @@ -439,7 +433,7 @@ def insert_one(self, doc): self.mongo_collection ].insert_one(doc) except Exception as e: - error_msg = "Connot insert doc\n" + error_msg = "Cannot insert doc\n" error_msg += "ERROR -- {}:\n{}".format( e, "".join(traceback.format_exception(None, e, e.__traceback__)) ) @@ -447,9 +441,35 @@ def insert_one(self, doc): return rec.inserted_id + def _push_job_logs(self, log_lines: JobLog, job_id: str, record_count: int): + """ append a list of job logs, and update the record count """ + + update_filter = {"_id": ObjectId(job_id)} + push_op = {"lines": {"$each": log_lines}} + + set_op = { + "original_line_count": record_count, + "stored_line_count": record_count, + "updated": time.time(), + } + update = {"$push": push_op, "$set": set_op} + with self.pymongo_client(self.mongo_collection) as pymongo_client: + job_col = pymongo_client[self.mongo_database][self.mongo_collection] + try: + job_col.update_one(update_filter, update, upsert=False) + except Exception as e: + error_msg = "Cannot update doc\n ERROR -- {}:\n{}".format( + e, "".join(traceback.format_exception(None, e, e.__traceback__)) + ) + raise ValueError(error_msg) + + slc = job_col.find_one({"_id": ObjectId(job_id)}).get("stored_line_count") + + return slc + def update_one(self, doc, job_id): """ - update existing records + update existing records or create if they do not exist https://docs.mongodb.com/manual/reference/operator/update/set/ """ with self.pymongo_client(self.mongo_collection) as pymongo_client: diff --git a/lib/execution_engine2/sdk/EE2Logs.py b/lib/execution_engine2/sdk/EE2Logs.py index 846bf0ed1..daca2347e 100644 --- a/lib/execution_engine2/sdk/EE2Logs.py +++ b/lib/execution_engine2/sdk/EE2Logs.py @@ -1,4 +1,3 @@ -import time from enum import Enum from typing import Dict, NamedTuple @@ -6,6 +5,10 @@ from lib.execution_engine2.exceptions import RecordNotFoundException +# if TYPE_CHECKING: +# from lib.execution_engine2.sdk.SDKMethodRunner import SDKMethodRunner + + class JobPermissions(Enum): READ = "r" WRITE = "w" @@ -20,44 +23,63 @@ class AddLogResult(NamedTuple): class EE2Logs: def __init__(self, sdkmr): self.sdkmr = sdkmr + self.mongo_util = self.sdkmr.get_mongo_util() - @staticmethod - def _create_new_log(pk): - jl = JLModel() - jl.primary_key = pk - jl.original_line_count = 0 - jl.stored_line_count = 0 - jl.lines = [] - return jl - - def _add_job_logs_helper(self, ee2_log: JLModel, log_lines: list): - """ - :param ee2_log: The mongo ee2_log to operate on - :param log_lines: The lines to add to this log - :return: - """ - original_line_count = ee2_log.get("original_line_count") + def _format_job_logs(self, record_position, log_lines): + log_lines_formatted = [] for input_line in log_lines: - original_line_count += 1 + record_position += 1 ll = LogLines() ll.error = int(input_line.get("is_error", 0)) == 1 - ll.linepos = original_line_count - + ll.linepos = record_position ts = input_line.get("ts") if ts is not None: ts = self.sdkmr.check_and_convert_time(ts, assign_default_time=True) ll.ts = ts - ll.line = input_line.get("line") ll.validate() - ee2_log["lines"].append(ll.to_mongo().to_dict()) + log_lines_formatted.append(ll.to_mongo().to_dict()) - ee2_log["updated"] = time.time() - ee2_log["original_line_count"] = original_line_count - ee2_log["stored_line_count"] = original_line_count + return log_lines_formatted - return ee2_log + def _create_new_log(self, pk, log_lines: list): + """ + :param ee2_log: The mongo ee2_log to operate on + :param log_lines: The lines to add to this log + :return: + """ + with self.mongo_util.mongo_engine_connection(): + jl = JLModel() + jl.primary_key = pk + jl.original_line_count = 0 + jl.stored_line_count = 0 + jl.lines = self._format_job_logs(record_position=-1, log_lines=log_lines) + jl.original_line_count = jl.stored_line_count = len(log_lines) + jl.save() + return jl + + def _add_first_logs(self, log_lines, job_id): + """ + Initialize the log since it doesn't exist + :param log_lines: + :param job_id: + :return: + """ + self.sdkmr.logger.debug(f"About to create new log record for {job_id}") + log = self._create_new_log(pk=job_id, log_lines=log_lines) + return AddLogResult(success=True, stored_line_count=log.stored_line_count) + + def _add_subsequent_logs(self, job_log, log_lines): + """ Add logs to an existing log entry """ + formatted_logs = self._format_job_logs( + record_position=job_log["stored_line_count"] - 1, log_lines=log_lines + ) + record_count = int(job_log["stored_line_count"]) + len(formatted_logs) + slc = self.mongo_util._push_job_logs( + formatted_logs, job_id=job_log["_id"], record_count=record_count + ) + return AddLogResult(success=True, stored_line_count=slc) def add_job_logs(self, job_id, log_lines, as_admin=False) -> AddLogResult: """ @@ -80,44 +102,21 @@ def add_job_logs(self, job_id, log_lines, as_admin=False) -> AddLogResult: :param as_admin: :return: """ - log = {"stored_line_count": -1} - + job_log = {"stored_line_count": -1} self.sdkmr.get_job_with_permission( job_id, JobPermissions.WRITE, as_admin=as_admin ) - + self.sdkmr.logger.debug(f"About to add logs for {job_id}") try: - self.sdkmr.logger.debug(f"About to add logs for {job_id}") - mongo_util = self.sdkmr.get_mongo_util() try: - log = mongo_util.get_job_log_pymongo(job_id) + job_log = self.mongo_util.get_job_log_pymongo(job_id) except RecordNotFoundException: - # What really should happen is the log is created and then SAVED, and then - # Retrieved again, and then we should use native log line append to the record - # INstead of updating the entire record.. And then update the line positions and line counts - # upon successfull appending of ALL logs - log = self._create_new_log(pk=job_id).to_mongo().to_dict() - - log = self._add_job_logs_helper(ee2_log=log, log_lines=log_lines) - - try: - with mongo_util.pymongo_client( - self.sdkmr.config["mongo-logs-collection"] - ): - mongo_util.update_one(log, str(log.get("_id"))) - except Exception as e: - self.sdkmr.logger.error(e) - ll = [{"line": f"{e}", "is_error": 1}] - log = self._add_job_logs_helper(ee2_log=log, log_lines=ll) - olc = mongo_util.update_one(log, str(log.get("_id"))) - return olc - + return self._add_first_logs(log_lines=log_lines, job_id=job_id) + return self._add_subsequent_logs(job_log, log_lines) + except Exception as e: + self.sdkmr.logger.error(e) return AddLogResult( - success=False, stored_line_count=log["stored_line_count"] - ) - except Exception: - return AddLogResult( - success=False, stored_line_count=log["stored_line_count"] + success=False, stored_line_count=job_log["stored_line_count"] ) def _get_job_logs(self, job_id, skip_lines, limit=None) -> Dict: @@ -146,12 +145,10 @@ def _get_job_logs(self, job_id, skip_lines, limit=None) -> Dict: :return: """ - log = self.sdkmr.get_mongo_util().get_job_log_pymongo(job_id) - + log = self.mongo_util.get_job_log_pymongo(job_id) lines = [] last_line_number = 0 count = len(log.get("lines", [])) - for log_line in log.get("lines", []): # type: LogLines if skip_lines and int(skip_lines) >= log_line.get("linepos", 0): diff --git a/test/tests_for_sdkmr/ee2_SDKMethodRunner_EE2Logs_test.py b/test/tests_for_sdkmr/ee2_SDKMethodRunner_EE2Logs_test.py index c26963681..6817bd324 100644 --- a/test/tests_for_sdkmr/ee2_SDKMethodRunner_EE2Logs_test.py +++ b/test/tests_for_sdkmr/ee2_SDKMethodRunner_EE2Logs_test.py @@ -87,8 +87,8 @@ def test_add_job_logs_ok(self, rq_mock): test_line = ori_lines[0] - self.assertEqual(test_line["line"], "Hello world") - self.assertEqual(test_line["linepos"], 1) + self.assertEquals(test_line["line"], "Hello world") + self.assertEqual(test_line["linepos"], 0) self.assertEqual(test_line["error"], False) # add job log @@ -98,7 +98,7 @@ def test_add_job_logs_ok(self, rq_mock): ] runner.add_job_logs(job_id=job_id, log_lines=lines) - + # log = self.mongo_util.get_job_log(job_id=job_id) self.assertTrue(log.updated) self.assertTrue(ori_updated_time < log.updated) @@ -106,26 +106,26 @@ def test_add_job_logs_ok(self, rq_mock): self.assertEqual(log.stored_line_count, 3) ori_lines = log.lines self.assertEqual(len(ori_lines), 3) - + # # original line test_line = ori_lines[0] - print(test_line) - self.assertEqual(test_line["line"], "Hello world") - self.assertEqual(test_line["linepos"], 1) - self.assertEqual(test_line["error"], 0) + print(ori_lines) + self.assertEqual("Hello world", test_line["line"]) + self.assertEqual(0, test_line["linepos"]) + self.assertEqual(0, test_line["error"]) # new line test_line = ori_lines[1] self.assertEqual(test_line["line"], "Hello Kbase") - self.assertEqual(test_line["linepos"], 2) + self.assertEqual(test_line["linepos"], 1) self.assertEqual(test_line["error"], 1) test_line = ori_lines[2] self.assertEqual(test_line["line"], "Hello Wrold Kbase") - self.assertEqual(test_line["linepos"], 3) + self.assertEqual(test_line["linepos"], 2) self.assertEqual(test_line["error"], 0) self.mongo_util.get_job(job_id=job_id).delete() @@ -133,3 +133,6 @@ def test_add_job_logs_ok(self, rq_mock): self.mongo_util.get_job_log(job_id=job_id).delete() self.assertEqual(ori_job_log_count, JobLog.objects.count()) + + +# diff --git a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test.py b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test.py index 6ce5f6107..3b0db28b1 100644 --- a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test.py +++ b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test.py @@ -374,6 +374,7 @@ def test_run_job_and_add_log(self, rq_mock, condor_mock): log_lines = log["lines"] for i, inserted_line in enumerate(log_lines): self.assertEqual(inserted_line["line"], lines[i]["line"]) + self.assertEqual(inserted_line["linepos"], i) line1 = { "error": False, @@ -395,6 +396,7 @@ def test_run_job_and_add_log(self, rq_mock, condor_mock): "line": "This is the read deal4", "ts": str(datetime.now().timestamp()), } + input_lines2 = [line1, line2, line3, line4] for line in input_lines2: @@ -406,19 +408,24 @@ def test_run_job_and_add_log(self, rq_mock, condor_mock): logging.info( f"After inserting timestamped logs, log position is now {log_pos2}" ) + print("Comparing ", log_pos_1, log_pos2, log_pos2 - len(lines)) + self.assertEqual(log_pos_1, log_pos2 - len(lines)) log = runner.view_job_logs(job_id=job_id, skip_lines=None) log_lines = log["lines"] - print("About to dump log") - print(json.dumps(log)) + print("Before SkipLines Test") for i, inserted_line in enumerate(log_lines): if i < log_pos_1: continue - + print("Checking", i) + print("Checking to see if", inserted_line["line"]) + print(input_lines2[i - log_pos_1]["line"]) self.assertEqual(inserted_line["line"], input_lines2[i - log_pos_1]["line"]) + print("SUCCESS") time_input = input_lines2[i - log_pos_1]["ts"] + print("Time input is", time_input) if isinstance(time_input, str): if time_input.replace(".", "", 1).isdigit(): time_input = ( @@ -431,13 +438,18 @@ def test_run_job_and_add_log(self, rq_mock, condor_mock): elif isinstance(time_input, int): time_input = time_input / 1000.0 + print("Time 2 is", time_input) self.assertEqual(inserted_line["ts"], int(time_input * 1000)) error1 = line["error"] + error2 = input_lines2[i - log_pos_1]["error"] + + print(line) + print(input_lines2[i - log_pos_1]) self.assertEqual(error1, error2) - # TODO IMPLEMENT SKIPLINES AND TEST + print("SkipLines Test") log = runner.view_job_logs(job_id=job_id, skip_lines=1) self.assertEqual(log["lines"][0]["linepos"], 2) @@ -449,20 +461,24 @@ def test_run_job_and_add_log(self, rq_mock, condor_mock): # Test limit log = runner.view_job_logs(job_id=job_id, limit=2) self.assertEqual(len(log["lines"]), 2) - self.assertEqual(log["lines"][0]["linepos"], 1) - self.assertEqual(log["lines"][-1]["linepos"], 2) - self.assertEqual(log["last_line_number"], 2) + self.assertEqual(log["lines"][0]["linepos"], 0) + self.assertEqual(log["lines"][-1]["linepos"], 1) + self.assertEqual(log["last_line_number"], 1) - log = runner.view_job_logs(job_id=job_id, limit=3, skip_lines=2) - self.assertEqual(len(log["lines"]), 3) - self.assertEqual(log["lines"][0]["linepos"], 3) - self.assertEqual(log["lines"][-1]["linepos"], 5) - self.assertEqual(log["last_line_number"], 5) + log = runner.view_job_logs(job_id=job_id, limit=3, skip_lines=0) + self.assertEqual( + 3, len(log["lines"]), + ) + self.assertEqual(0, log["lines"][0]["linepos"]) + self.assertEqual(2, log["lines"][-1]["linepos"]) + self.assertEqual(log["last_line_number"], 2) - log = runner.view_job_logs(job_id=job_id, limit=3, skip_lines=7) - self.assertEqual(len(log["lines"]), 1) - self.assertEqual(log["lines"][0]["linepos"], 8) - self.assertEqual(log["last_line_number"], 8) + log = runner.view_job_logs(job_id=job_id, limit=3, skip_lines=5) + self.assertEqual( + 2, len(log["lines"]), + ) + self.assertEqual(6, log["lines"][0]["linepos"]) + self.assertEqual(7, log["last_line_number"]) log = runner.view_job_logs(job_id=job_id, limit=3, skip_lines=8) self.assertEqual(log["lines"], []) diff --git a/test/tests_for_sdkmr/ee2_load_test.py b/test/tests_for_sdkmr/ee2_load_test.py index 8bd01b27e..cfdc9b918 100644 --- a/test/tests_for_sdkmr/ee2_load_test.py +++ b/test/tests_for_sdkmr/ee2_load_test.py @@ -629,7 +629,7 @@ def test_get_job_logs_stress(self): for job_line in job_lines: job_line = job_line[0]["lines"][0] self.assertEqual(job_line["line"], "hello ee2") - self.assertEqual(job_line["linepos"], 1) + self.assertEqual(job_line["linepos"], 0) self.assertEqual(job_line["is_error"], 1) self.assertEqual(job_line["ts"], int(ts * 1000)) @@ -682,7 +682,7 @@ def test_add_job_logs_stress(self): ] self.assertEqual( - job_lines["last_line_number"], thread_count + job_lines["last_line_number"], thread_count - 1 ) # exam total number of job lines created by add_job_logs # exam each line created by add_job_logs @@ -694,7 +694,7 @@ def test_add_job_logs_stress(self): self.assertEqual(line["is_error"], 1) self.assertEqual(line["ts"], int(ts * 1000)) line_pos.append(line["linepos"]) - self.assertCountEqual(line_pos, list(range(1, thread_count + 1))) + self.assertCountEqual(line_pos, list(range(0, thread_count))) jobs = self.mongo_util.get_jobs(job_ids=[job_id])