From 4d95ffb334dd0fc7a66b903cb5e68a9912fc59ae Mon Sep 17 00:00:00 2001 From: bio-boris Date: Fri, 5 Jun 2020 10:49:29 -0500 Subject: [PATCH 1/7] First attempt --- lib/execution_engine2/db/MongoUtil.py | 49 ++++++++++++-- lib/execution_engine2/sdk/EE2Logs.py | 94 ++++++++++++++++++--------- 2 files changed, 105 insertions(+), 38 deletions(-) diff --git a/lib/execution_engine2/db/MongoUtil.py b/lib/execution_engine2/db/MongoUtil.py index 8c355b7db..e584bc83a 100644 --- a/lib/execution_engine2/db/MongoUtil.py +++ b/lib/execution_engine2/db/MongoUtil.py @@ -417,11 +417,7 @@ def update_job_status(self, job_id, status, msg=None, error_message=None): j.save() - def get_empty_job_log(self): - jl = JobLog() - jl.stored_line_count = 0 - jl.original_line_count = 0 - return jl + @contextmanager def mongo_engine_connection(self): @@ -439,7 +435,7 @@ def insert_one(self, doc): self.mongo_collection ].insert_one(doc) except Exception as e: - error_msg = "Connot insert doc\n" + error_msg = "Cannot insert doc\n" error_msg += "ERROR -- {}:\n{}".format( e, "".join(traceback.format_exception(None, e, e.__traceback__)) ) @@ -447,9 +443,48 @@ def insert_one(self, doc): return rec.inserted_id + def _add_job_logs(self, log_lines, job_id): + """ + + :param log_lines: + :param job_id: + :return: + """ + + update_filter = {"_id": ObjectId(job_id)} + push_op = {'lines': log_lines['lines']} + + + set_op = {'original_line_count': log_lines['original_line_count'], + 'stored_line_count': log_lines['original_line_count'], + 'updated': time.time()} + + print("set_op op is", set_op) + + update = {"$pushAll": push_op, "$set": set_op} + + print("Update is", update) + + print(f"About to do the subsequent log record 22") + with self.pymongo_client(self.mongo_collection) as pymongo_client: + print(f"About to do the subsequent log record 23") + + job_col = pymongo_client[self.mongo_database][self.mongo_collection] + # try: + print(f"About to return update", update_filter, update) + job_col.update_one(update_filter, update, upsert=False) + # except Exception as e: + # error_msg = "Cannot update doc\n ERROR -- {}:\n{}".format( + # e, "".join(traceback.format_exception(None, e, e.__traceback__)) + # ) + # raise ValueError(error_msg) + + slc = job_col.find_one({"_id": ObjectId(job_id)}).get("stored_line_count") + print(f"About to return slc {slc}" ) + return slc def update_one(self, doc, job_id): """ - update existing records + update existing records or create if they do not exist https://docs.mongodb.com/manual/reference/operator/update/set/ """ with self.pymongo_client(self.mongo_collection) as pymongo_client: diff --git a/lib/execution_engine2/sdk/EE2Logs.py b/lib/execution_engine2/sdk/EE2Logs.py index 846bf0ed1..b57400269 100644 --- a/lib/execution_engine2/sdk/EE2Logs.py +++ b/lib/execution_engine2/sdk/EE2Logs.py @@ -6,6 +6,10 @@ from lib.execution_engine2.exceptions import RecordNotFoundException +# if TYPE_CHECKING: +# from lib.execution_engine2.sdk.SDKMethodRunner import SDKMethodRunner + + class JobPermissions(Enum): READ = "r" WRITE = "w" @@ -20,6 +24,7 @@ class AddLogResult(NamedTuple): class EE2Logs: def __init__(self, sdkmr): self.sdkmr = sdkmr + self.mongo_util = self.sdkmr.get_mongo_util() @staticmethod def _create_new_log(pk): @@ -37,9 +42,13 @@ def _add_job_logs_helper(self, ee2_log: JLModel, log_lines: list): :return: """ original_line_count = ee2_log.get("original_line_count") + # Empty Logs Case + if original_line_count == 0: + original_line_count = -1 for input_line in log_lines: original_line_count += 1 + print("About to set linepos to", original_line_count) ll = LogLines() ll.error = int(input_line.get("is_error", 0)) == 1 ll.linepos = original_line_count @@ -54,11 +63,43 @@ def _add_job_logs_helper(self, ee2_log: JLModel, log_lines: list): ee2_log["lines"].append(ll.to_mongo().to_dict()) ee2_log["updated"] = time.time() - ee2_log["original_line_count"] = original_line_count - ee2_log["stored_line_count"] = original_line_count + ee2_log["original_line_count"] = original_line_count + 1 + ee2_log["stored_line_count"] = original_line_count + 1 return ee2_log + def _add_first_logs(self, log_lines, job_id): + """ + Initialize the log since it doesn't exist + :param log_lines: + :param job_id: + :return: + """ + self.sdkmr.logger.info(f"About to create new log record") + + new_log = self._create_new_log(pk=job_id).to_mongo().to_dict() + log = self._add_job_logs_helper(ee2_log=new_log, log_lines=log_lines) + try: + self.mongo_util.update_one(log, str(log.get("_id"))) + return AddLogResult( + success=True, stored_line_count=log["stored_line_count"] + ) + except Exception as e: + self.sdkmr.logger.error(e) + ll = [{"line": f"{e}", "is_error": 1}] + log = self._add_job_logs_helper(ee2_log=log, log_lines=ll) + self.mongo_util.update_one(log, str(log.get("_id"))) + return AddLogResult( + success=False, stored_line_count=log["stored_line_count"] + ) + + def _add_subsequent_logs(self, formatted_logs, job_id): + self.sdkmr.logger.info(f"About to create subsequent log record") + slc = self.mongo_util._add_job_logs(formatted_logs, job_id=job_id) + return AddLogResult( + success=True, stored_line_count=slc + ) + def add_job_logs(self, job_id, log_lines, as_admin=False) -> AddLogResult: """ #Authorization Required : Ability to read and write to the workspace @@ -85,40 +126,30 @@ def add_job_logs(self, job_id, log_lines, as_admin=False) -> AddLogResult: self.sdkmr.get_job_with_permission( job_id, JobPermissions.WRITE, as_admin=as_admin ) + self.sdkmr.logger.info(f"About to add logs for {job_id}") try: - self.sdkmr.logger.debug(f"About to add logs for {job_id}") - mongo_util = self.sdkmr.get_mongo_util() try: - log = mongo_util.get_job_log_pymongo(job_id) + log = self.mongo_util.get_job_log_pymongo(job_id) + from pprint import pprint + print("About to insert") + pprint(log) + pprint(log_lines) + except RecordNotFoundException: - # What really should happen is the log is created and then SAVED, and then - # Retrieved again, and then we should use native log line append to the record - # INstead of updating the entire record.. And then update the line positions and line counts - # upon successfull appending of ALL logs - log = self._create_new_log(pk=job_id).to_mongo().to_dict() + print("not found") + return self._add_first_logs(log_lines=log_lines, job_id=job_id) - log = self._add_job_logs_helper(ee2_log=log, log_lines=log_lines) + formatted_logs = self._add_job_logs_helper(ee2_log=log, log_lines=log_lines) - try: - with mongo_util.pymongo_client( - self.sdkmr.config["mongo-logs-collection"] - ): - mongo_util.update_one(log, str(log.get("_id"))) - except Exception as e: - self.sdkmr.logger.error(e) - ll = [{"line": f"{e}", "is_error": 1}] - log = self._add_job_logs_helper(ee2_log=log, log_lines=ll) - olc = mongo_util.update_one(log, str(log.get("_id"))) - return olc + return self._add_subsequent_logs(formatted_logs, job_id=job_id) - return AddLogResult( - success=False, stored_line_count=log["stored_line_count"] - ) - except Exception: - return AddLogResult( - success=False, stored_line_count=log["stored_line_count"] - ) + except Exception as e: + self.sdkmr.logger.error(e) + raise(e) + # return AddLogResult( + # success=False, stored_line_count=log["stored_line_count"] + # ) def _get_job_logs(self, job_id, skip_lines, limit=None) -> Dict: """ @@ -146,8 +177,9 @@ def _get_job_logs(self, job_id, skip_lines, limit=None) -> Dict: :return: """ - log = self.sdkmr.get_mongo_util().get_job_log_pymongo(job_id) - + log = self.mongo_util.get_job_log_pymongo(job_id) + from pprint import pprint + pprint(log) lines = [] last_line_number = 0 count = len(log.get("lines", [])) From 532ec159c7d0c289102e04951730691ea03ad90a Mon Sep 17 00:00:00 2001 From: bio-boris Date: Fri, 5 Jun 2020 13:06:46 -0500 Subject: [PATCH 2/7] Fix bugs --- lib/execution_engine2/db/MongoUtil.py | 64 ++++++-------- lib/execution_engine2/sdk/EE2Logs.py | 117 +++++++++----------------- 2 files changed, 68 insertions(+), 113 deletions(-) diff --git a/lib/execution_engine2/db/MongoUtil.py b/lib/execution_engine2/db/MongoUtil.py index e584bc83a..44398b9ae 100644 --- a/lib/execution_engine2/db/MongoUtil.py +++ b/lib/execution_engine2/db/MongoUtil.py @@ -89,13 +89,13 @@ def _start_local_service(self): @classmethod def _get_collection( - self, - mongo_host: str, - mongo_port: int, - mongo_database: str, - mongo_user: str = None, - mongo_password: str = None, - mongo_authmechanism: str = "DEFAULT", + self, + mongo_host: str, + mongo_port: int, + mongo_database: str, + mongo_user: str = None, + mongo_password: str = None, + mongo_authmechanism: str = "DEFAULT", ): """ Connect to Mongo server and return a tuple with the MongoClient and MongoClient? @@ -232,8 +232,8 @@ def get_jobs(self, job_ids=None, exclude_fields=None, sort_id_ascending=None): raise ValueError("Please input a list type exclude_fields") jobs = ( Job.objects(id__in=job_ids) - .exclude(*exclude_fields) - .order_by("{}_id".format(sort_id_indicator)) + .exclude(*exclude_fields) + .order_by("{}_id".format(sort_id_indicator)) ) else: @@ -417,8 +417,6 @@ def update_job_status(self, job_id, status, msg=None, error_message=None): j.save() - - @contextmanager def mongo_engine_connection(self): yield self.me_connection @@ -443,7 +441,7 @@ def insert_one(self, doc): return rec.inserted_id - def _add_job_logs(self, log_lines, job_id): + def _add_job_logs(self, log_lines: JobLog, job_id: str, record_count: int): """ :param log_lines: @@ -452,36 +450,28 @@ def _add_job_logs(self, log_lines, job_id): """ update_filter = {"_id": ObjectId(job_id)} - push_op = {'lines': log_lines['lines']} - - - set_op = {'original_line_count': log_lines['original_line_count'], - 'stored_line_count': log_lines['original_line_count'], - 'updated': time.time()} - - print("set_op op is", set_op) + push_op = {"lines": log_lines} + set_op = { + "original_line_count": record_count, + "stored_line_count": record_count, + "updated": time.time(), + } update = {"$pushAll": push_op, "$set": set_op} - - print("Update is", update) - - print(f"About to do the subsequent log record 22") with self.pymongo_client(self.mongo_collection) as pymongo_client: - print(f"About to do the subsequent log record 23") - job_col = pymongo_client[self.mongo_database][self.mongo_collection] - # try: - print(f"About to return update", update_filter, update) - job_col.update_one(update_filter, update, upsert=False) - # except Exception as e: - # error_msg = "Cannot update doc\n ERROR -- {}:\n{}".format( - # e, "".join(traceback.format_exception(None, e, e.__traceback__)) - # ) - # raise ValueError(error_msg) - - slc = job_col.find_one({"_id": ObjectId(job_id)}).get("stored_line_count") - print(f"About to return slc {slc}" ) + try: + job_col.update_one(update_filter, update, upsert=False) + except Exception as e: + error_msg = "Cannot update doc\n ERROR -- {}:\n{}".format( + e, "".join(traceback.format_exception(None, e, e.__traceback__)) + ) + raise ValueError(error_msg) + + slc = job_col.find_one({"_id": ObjectId(job_id)}).get("stored_line_count") + print(f"About to return slc {slc}") return slc + def update_one(self, doc, job_id): """ update existing records or create if they do not exist diff --git a/lib/execution_engine2/sdk/EE2Logs.py b/lib/execution_engine2/sdk/EE2Logs.py index b57400269..b9234d78c 100644 --- a/lib/execution_engine2/sdk/EE2Logs.py +++ b/lib/execution_engine2/sdk/EE2Logs.py @@ -1,4 +1,3 @@ -import time from enum import Enum from typing import Dict, NamedTuple @@ -26,47 +25,39 @@ def __init__(self, sdkmr): self.sdkmr = sdkmr self.mongo_util = self.sdkmr.get_mongo_util() - @staticmethod - def _create_new_log(pk): - jl = JLModel() - jl.primary_key = pk - jl.original_line_count = 0 - jl.stored_line_count = 0 - jl.lines = [] - return jl - - def _add_job_logs_helper(self, ee2_log: JLModel, log_lines: list): - """ - :param ee2_log: The mongo ee2_log to operate on - :param log_lines: The lines to add to this log - :return: - """ - original_line_count = ee2_log.get("original_line_count") - # Empty Logs Case - if original_line_count == 0: - original_line_count = -1 + def _format_job_logs(self, record_position, log_lines): + log_lines_formatted = [] for input_line in log_lines: - original_line_count += 1 - print("About to set linepos to", original_line_count) + record_position += 1 ll = LogLines() ll.error = int(input_line.get("is_error", 0)) == 1 - ll.linepos = original_line_count - + ll.linepos = record_position ts = input_line.get("ts") if ts is not None: ts = self.sdkmr.check_and_convert_time(ts, assign_default_time=True) ll.ts = ts - ll.line = input_line.get("line") ll.validate() - ee2_log["lines"].append(ll.to_mongo().to_dict()) + log_lines_formatted.append(ll.to_mongo().to_dict()) - ee2_log["updated"] = time.time() - ee2_log["original_line_count"] = original_line_count + 1 - ee2_log["stored_line_count"] = original_line_count + 1 + return log_lines_formatted - return ee2_log + def _create_new_log(self, pk, log_lines: list): + """ + :param ee2_log: The mongo ee2_log to operate on + :param log_lines: The lines to add to this log + :return: + """ + with self.mongo_util.mongo_engine_connection(): + jl = JLModel() + jl.primary_key = pk + jl.original_line_count = 0 + jl.stored_line_count = 0 + jl.lines = self._format_job_logs(record_position=-1, log_lines=log_lines) + jl.original_line_count = jl.stored_line_count = len(log_lines) + jl.save() + return jl def _add_first_logs(self, log_lines, job_id): """ @@ -75,30 +66,20 @@ def _add_first_logs(self, log_lines, job_id): :param job_id: :return: """ - self.sdkmr.logger.info(f"About to create new log record") - - new_log = self._create_new_log(pk=job_id).to_mongo().to_dict() - log = self._add_job_logs_helper(ee2_log=new_log, log_lines=log_lines) - try: - self.mongo_util.update_one(log, str(log.get("_id"))) - return AddLogResult( - success=True, stored_line_count=log["stored_line_count"] - ) - except Exception as e: - self.sdkmr.logger.error(e) - ll = [{"line": f"{e}", "is_error": 1}] - log = self._add_job_logs_helper(ee2_log=log, log_lines=ll) - self.mongo_util.update_one(log, str(log.get("_id"))) - return AddLogResult( - success=False, stored_line_count=log["stored_line_count"] - ) - - def _add_subsequent_logs(self, formatted_logs, job_id): - self.sdkmr.logger.info(f"About to create subsequent log record") - slc = self.mongo_util._add_job_logs(formatted_logs, job_id=job_id) - return AddLogResult( - success=True, stored_line_count=slc + self.sdkmr.logger.debug(f"About to create new log record for {job_id}") + log = self._create_new_log(pk=job_id, log_lines=log_lines) + return AddLogResult(success=True, stored_line_count=log.stored_line_count) + + def _add_subsequent_logs(self, job_log, log_lines): + """ Add logs to an existing log entry """ + formatted_logs = self._format_job_logs( + record_position=job_log["stored_line_count"] - 1, log_lines=log_lines ) + record_count = int(job_log["stored_line_count"]) + len(formatted_logs) + slc = self.mongo_util._add_job_logs( + formatted_logs, job_id=job_log["_id"], record_count=record_count + ) + return AddLogResult(success=True, stored_line_count=slc) def add_job_logs(self, job_id, log_lines, as_admin=False) -> AddLogResult: """ @@ -121,35 +102,22 @@ def add_job_logs(self, job_id, log_lines, as_admin=False) -> AddLogResult: :param as_admin: :return: """ - log = {"stored_line_count": -1} - + job_log = {"stored_line_count": -1} self.sdkmr.get_job_with_permission( job_id, JobPermissions.WRITE, as_admin=as_admin ) - self.sdkmr.logger.info(f"About to add logs for {job_id}") - + self.sdkmr.logger.debug(f"About to add logs for {job_id}") try: try: - log = self.mongo_util.get_job_log_pymongo(job_id) - from pprint import pprint - print("About to insert") - pprint(log) - pprint(log_lines) - + job_log = self.mongo_util.get_job_log_pymongo(job_id) except RecordNotFoundException: - print("not found") return self._add_first_logs(log_lines=log_lines, job_id=job_id) - - formatted_logs = self._add_job_logs_helper(ee2_log=log, log_lines=log_lines) - - return self._add_subsequent_logs(formatted_logs, job_id=job_id) - + return self._add_subsequent_logs(job_log, log_lines) except Exception as e: self.sdkmr.logger.error(e) - raise(e) - # return AddLogResult( - # success=False, stored_line_count=log["stored_line_count"] - # ) + return AddLogResult( + success=False, stored_line_count=job_log["stored_line_count"] + ) def _get_job_logs(self, job_id, skip_lines, limit=None) -> Dict: """ @@ -178,12 +146,9 @@ def _get_job_logs(self, job_id, skip_lines, limit=None) -> Dict: """ log = self.mongo_util.get_job_log_pymongo(job_id) - from pprint import pprint - pprint(log) lines = [] last_line_number = 0 count = len(log.get("lines", [])) - for log_line in log.get("lines", []): # type: LogLines if skip_lines and int(skip_lines) >= log_line.get("linepos", 0): From d83dc9304440d0e772405354e59a80985d82a82c Mon Sep 17 00:00:00 2001 From: bio-boris Date: Fri, 5 Jun 2020 13:18:22 -0500 Subject: [PATCH 3/7] Fix tests --- lib/execution_engine2/db/MongoUtil.py | 9 ++------ lib/execution_engine2/sdk/EE2Logs.py | 2 +- .../ee2_SDKMethodRunner_EE2Logs_test.py | 23 +++++++++++-------- .../ee2_SDKMethodRunner_test.py | 20 ++++++++++++---- 4 files changed, 32 insertions(+), 22 deletions(-) diff --git a/lib/execution_engine2/db/MongoUtil.py b/lib/execution_engine2/db/MongoUtil.py index 44398b9ae..d2834d72a 100644 --- a/lib/execution_engine2/db/MongoUtil.py +++ b/lib/execution_engine2/db/MongoUtil.py @@ -441,13 +441,8 @@ def insert_one(self, doc): return rec.inserted_id - def _add_job_logs(self, log_lines: JobLog, job_id: str, record_count: int): - """ - - :param log_lines: - :param job_id: - :return: - """ + def _push_job_logs(self, log_lines: JobLog, job_id: str, record_count: int): + """ append a list of job logs, and update the record count """ update_filter = {"_id": ObjectId(job_id)} push_op = {"lines": log_lines} diff --git a/lib/execution_engine2/sdk/EE2Logs.py b/lib/execution_engine2/sdk/EE2Logs.py index b9234d78c..daca2347e 100644 --- a/lib/execution_engine2/sdk/EE2Logs.py +++ b/lib/execution_engine2/sdk/EE2Logs.py @@ -76,7 +76,7 @@ def _add_subsequent_logs(self, job_log, log_lines): record_position=job_log["stored_line_count"] - 1, log_lines=log_lines ) record_count = int(job_log["stored_line_count"]) + len(formatted_logs) - slc = self.mongo_util._add_job_logs( + slc = self.mongo_util._push_job_logs( formatted_logs, job_id=job_log["_id"], record_count=record_count ) return AddLogResult(success=True, stored_line_count=slc) diff --git a/test/tests_for_sdkmr/ee2_SDKMethodRunner_EE2Logs_test.py b/test/tests_for_sdkmr/ee2_SDKMethodRunner_EE2Logs_test.py index c26963681..6817bd324 100644 --- a/test/tests_for_sdkmr/ee2_SDKMethodRunner_EE2Logs_test.py +++ b/test/tests_for_sdkmr/ee2_SDKMethodRunner_EE2Logs_test.py @@ -87,8 +87,8 @@ def test_add_job_logs_ok(self, rq_mock): test_line = ori_lines[0] - self.assertEqual(test_line["line"], "Hello world") - self.assertEqual(test_line["linepos"], 1) + self.assertEquals(test_line["line"], "Hello world") + self.assertEqual(test_line["linepos"], 0) self.assertEqual(test_line["error"], False) # add job log @@ -98,7 +98,7 @@ def test_add_job_logs_ok(self, rq_mock): ] runner.add_job_logs(job_id=job_id, log_lines=lines) - + # log = self.mongo_util.get_job_log(job_id=job_id) self.assertTrue(log.updated) self.assertTrue(ori_updated_time < log.updated) @@ -106,26 +106,26 @@ def test_add_job_logs_ok(self, rq_mock): self.assertEqual(log.stored_line_count, 3) ori_lines = log.lines self.assertEqual(len(ori_lines), 3) - + # # original line test_line = ori_lines[0] - print(test_line) - self.assertEqual(test_line["line"], "Hello world") - self.assertEqual(test_line["linepos"], 1) - self.assertEqual(test_line["error"], 0) + print(ori_lines) + self.assertEqual("Hello world", test_line["line"]) + self.assertEqual(0, test_line["linepos"]) + self.assertEqual(0, test_line["error"]) # new line test_line = ori_lines[1] self.assertEqual(test_line["line"], "Hello Kbase") - self.assertEqual(test_line["linepos"], 2) + self.assertEqual(test_line["linepos"], 1) self.assertEqual(test_line["error"], 1) test_line = ori_lines[2] self.assertEqual(test_line["line"], "Hello Wrold Kbase") - self.assertEqual(test_line["linepos"], 3) + self.assertEqual(test_line["linepos"], 2) self.assertEqual(test_line["error"], 0) self.mongo_util.get_job(job_id=job_id).delete() @@ -133,3 +133,6 @@ def test_add_job_logs_ok(self, rq_mock): self.mongo_util.get_job_log(job_id=job_id).delete() self.assertEqual(ori_job_log_count, JobLog.objects.count()) + + +# diff --git a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test.py b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test.py index 6ce5f6107..c9cde624c 100644 --- a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test.py +++ b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test.py @@ -374,6 +374,7 @@ def test_run_job_and_add_log(self, rq_mock, condor_mock): log_lines = log["lines"] for i, inserted_line in enumerate(log_lines): self.assertEqual(inserted_line["line"], lines[i]["line"]) + self.assertEqual(inserted_line["linepos"], i) line1 = { "error": False, @@ -395,6 +396,7 @@ def test_run_job_and_add_log(self, rq_mock, condor_mock): "line": "This is the read deal4", "ts": str(datetime.now().timestamp()), } + input_lines2 = [line1, line2, line3, line4] for line in input_lines2: @@ -406,19 +408,24 @@ def test_run_job_and_add_log(self, rq_mock, condor_mock): logging.info( f"After inserting timestamped logs, log position is now {log_pos2}" ) + print("Comparing ", log_pos_1, log_pos2, log_pos2 - len(lines)) + self.assertEqual(log_pos_1, log_pos2 - len(lines)) log = runner.view_job_logs(job_id=job_id, skip_lines=None) log_lines = log["lines"] - print("About to dump log") - print(json.dumps(log)) + print("Before SkipLines Test") for i, inserted_line in enumerate(log_lines): if i < log_pos_1: continue - + print("Checking", i) + print("Checking to see if", inserted_line["line"]) + print(input_lines2[i - log_pos_1]["line"]) self.assertEqual(inserted_line["line"], input_lines2[i - log_pos_1]["line"]) + print("SUCCESS") time_input = input_lines2[i - log_pos_1]["ts"] + print("Time input is", time_input) if isinstance(time_input, str): if time_input.replace(".", "", 1).isdigit(): time_input = ( @@ -431,13 +438,18 @@ def test_run_job_and_add_log(self, rq_mock, condor_mock): elif isinstance(time_input, int): time_input = time_input / 1000.0 + print("Time 2 is", time_input) self.assertEqual(inserted_line["ts"], int(time_input * 1000)) error1 = line["error"] + error2 = input_lines2[i - log_pos_1]["error"] + + print(line) + print(input_lines2[i - log_pos_1]) self.assertEqual(error1, error2) - # TODO IMPLEMENT SKIPLINES AND TEST + print("SkipLines Test") log = runner.view_job_logs(job_id=job_id, skip_lines=1) self.assertEqual(log["lines"][0]["linepos"], 2) From dde708645223ef8c66e150da50d5ba196e8a52ad Mon Sep 17 00:00:00 2001 From: bio-boris Date: Fri, 5 Jun 2020 13:30:54 -0500 Subject: [PATCH 4/7] Fix tests --- lib/execution_engine2/db/MongoUtil.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/execution_engine2/db/MongoUtil.py b/lib/execution_engine2/db/MongoUtil.py index d2834d72a..d9c9775ca 100644 --- a/lib/execution_engine2/db/MongoUtil.py +++ b/lib/execution_engine2/db/MongoUtil.py @@ -464,7 +464,7 @@ def _push_job_logs(self, log_lines: JobLog, job_id: str, record_count: int): raise ValueError(error_msg) slc = job_col.find_one({"_id": ObjectId(job_id)}).get("stored_line_count") - print(f"About to return slc {slc}") + return slc def update_one(self, doc, job_id): From 5caaecdefe54a5732c9087cb7d0a3627aeef1964 Mon Sep 17 00:00:00 2001 From: bio-boris Date: Fri, 5 Jun 2020 14:15:05 -0500 Subject: [PATCH 5/7] Fix pushall with push each --- lib/execution_engine2/db/MongoUtil.py | 4 ++-- test/tests_for_sdkmr/ee2_load_test.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/execution_engine2/db/MongoUtil.py b/lib/execution_engine2/db/MongoUtil.py index d9c9775ca..61fe0137b 100644 --- a/lib/execution_engine2/db/MongoUtil.py +++ b/lib/execution_engine2/db/MongoUtil.py @@ -445,14 +445,14 @@ def _push_job_logs(self, log_lines: JobLog, job_id: str, record_count: int): """ append a list of job logs, and update the record count """ update_filter = {"_id": ObjectId(job_id)} - push_op = {"lines": log_lines} + push_op = {"lines": {"$each": log_lines}} set_op = { "original_line_count": record_count, "stored_line_count": record_count, "updated": time.time(), } - update = {"$pushAll": push_op, "$set": set_op} + update = {"$push": push_op, "$set": set_op} with self.pymongo_client(self.mongo_collection) as pymongo_client: job_col = pymongo_client[self.mongo_database][self.mongo_collection] try: diff --git a/test/tests_for_sdkmr/ee2_load_test.py b/test/tests_for_sdkmr/ee2_load_test.py index 8bd01b27e..cfdc9b918 100644 --- a/test/tests_for_sdkmr/ee2_load_test.py +++ b/test/tests_for_sdkmr/ee2_load_test.py @@ -629,7 +629,7 @@ def test_get_job_logs_stress(self): for job_line in job_lines: job_line = job_line[0]["lines"][0] self.assertEqual(job_line["line"], "hello ee2") - self.assertEqual(job_line["linepos"], 1) + self.assertEqual(job_line["linepos"], 0) self.assertEqual(job_line["is_error"], 1) self.assertEqual(job_line["ts"], int(ts * 1000)) @@ -682,7 +682,7 @@ def test_add_job_logs_stress(self): ] self.assertEqual( - job_lines["last_line_number"], thread_count + job_lines["last_line_number"], thread_count - 1 ) # exam total number of job lines created by add_job_logs # exam each line created by add_job_logs @@ -694,7 +694,7 @@ def test_add_job_logs_stress(self): self.assertEqual(line["is_error"], 1) self.assertEqual(line["ts"], int(ts * 1000)) line_pos.append(line["linepos"]) - self.assertCountEqual(line_pos, list(range(1, thread_count + 1))) + self.assertCountEqual(line_pos, list(range(0, thread_count))) jobs = self.mongo_util.get_jobs(job_ids=[job_id]) From e9747f203f709c04f3b810829bfb54d26e5e6c5b Mon Sep 17 00:00:00 2001 From: bio-boris Date: Fri, 5 Jun 2020 16:18:44 -0500 Subject: [PATCH 6/7] Fix tests --- .../ee2_SDKMethodRunner_test.py | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test.py b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test.py index c9cde624c..3b0db28b1 100644 --- a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test.py +++ b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test.py @@ -461,20 +461,24 @@ def test_run_job_and_add_log(self, rq_mock, condor_mock): # Test limit log = runner.view_job_logs(job_id=job_id, limit=2) self.assertEqual(len(log["lines"]), 2) - self.assertEqual(log["lines"][0]["linepos"], 1) - self.assertEqual(log["lines"][-1]["linepos"], 2) - self.assertEqual(log["last_line_number"], 2) + self.assertEqual(log["lines"][0]["linepos"], 0) + self.assertEqual(log["lines"][-1]["linepos"], 1) + self.assertEqual(log["last_line_number"], 1) - log = runner.view_job_logs(job_id=job_id, limit=3, skip_lines=2) - self.assertEqual(len(log["lines"]), 3) - self.assertEqual(log["lines"][0]["linepos"], 3) - self.assertEqual(log["lines"][-1]["linepos"], 5) - self.assertEqual(log["last_line_number"], 5) + log = runner.view_job_logs(job_id=job_id, limit=3, skip_lines=0) + self.assertEqual( + 3, len(log["lines"]), + ) + self.assertEqual(0, log["lines"][0]["linepos"]) + self.assertEqual(2, log["lines"][-1]["linepos"]) + self.assertEqual(log["last_line_number"], 2) - log = runner.view_job_logs(job_id=job_id, limit=3, skip_lines=7) - self.assertEqual(len(log["lines"]), 1) - self.assertEqual(log["lines"][0]["linepos"], 8) - self.assertEqual(log["last_line_number"], 8) + log = runner.view_job_logs(job_id=job_id, limit=3, skip_lines=5) + self.assertEqual( + 2, len(log["lines"]), + ) + self.assertEqual(6, log["lines"][0]["linepos"]) + self.assertEqual(7, log["last_line_number"]) log = runner.view_job_logs(job_id=job_id, limit=3, skip_lines=8) self.assertEqual(log["lines"], []) From d38e3fb02e55cd9f3b7e3f5187ce2fc28d43e28c Mon Sep 17 00:00:00 2001 From: bio-boris Date: Fri, 5 Jun 2020 16:28:57 -0500 Subject: [PATCH 7/7] Update release notes --- RELEASE_NOTES.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 178a4a93e..ed59f664b 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,5 +1,9 @@ # execution_engine2 (ee2) release notes ========================================= +## 0.0.3.3 + * Change log appends from update to $push + * Change first log linepos to position 0 instead of 1 + ## 0.0.3.2 * Change 2 db state updates for startjob into 1 * Rework Perms Model