Skip to content

Commit

Permalink
Merge pull request #245 from kbase/change_logs_to_push
Browse files Browse the repository at this point in the history
Change logs to push
  • Loading branch information
Tianhao-Gu authored Jun 5, 2020
2 parents 9bb36c1 + d38e3fb commit c4f1fa2
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 106 deletions.
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# execution_engine2 (ee2) release notes
=========================================
## 0.0.3.3
* Change log appends from update to $push
* Change first log linepos to position 0 instead of 1

## 0.0.3.2
* Change 2 db state updates for startjob into 1
* Rework Perms Model
Expand Down
54 changes: 37 additions & 17 deletions lib/execution_engine2/db/MongoUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ def _start_local_service(self):

@classmethod
def _get_collection(
self,
mongo_host: str,
mongo_port: int,
mongo_database: str,
mongo_user: str = None,
mongo_password: str = None,
mongo_authmechanism: str = "DEFAULT",
self,
mongo_host: str,
mongo_port: int,
mongo_database: str,
mongo_user: str = None,
mongo_password: str = None,
mongo_authmechanism: str = "DEFAULT",
):
"""
Connect to Mongo server and return a tuple with the MongoClient and MongoClient?
Expand Down Expand Up @@ -232,8 +232,8 @@ def get_jobs(self, job_ids=None, exclude_fields=None, sort_id_ascending=None):
raise ValueError("Please input a list type exclude_fields")
jobs = (
Job.objects(id__in=job_ids)
.exclude(*exclude_fields)
.order_by("{}_id".format(sort_id_indicator))
.exclude(*exclude_fields)
.order_by("{}_id".format(sort_id_indicator))
)

else:
Expand Down Expand Up @@ -417,12 +417,6 @@ def update_job_status(self, job_id, status, msg=None, error_message=None):

j.save()

def get_empty_job_log(self):
jl = JobLog()
jl.stored_line_count = 0
jl.original_line_count = 0
return jl

@contextmanager
def mongo_engine_connection(self):
yield self.me_connection
Expand All @@ -439,17 +433,43 @@ def insert_one(self, doc):
self.mongo_collection
].insert_one(doc)
except Exception as e:
error_msg = "Connot insert doc\n"
error_msg = "Cannot insert doc\n"
error_msg += "ERROR -- {}:\n{}".format(
e, "".join(traceback.format_exception(None, e, e.__traceback__))
)
raise ValueError(error_msg)

return rec.inserted_id

def _push_job_logs(self, log_lines: JobLog, job_id: str, record_count: int):
""" append a list of job logs, and update the record count """

update_filter = {"_id": ObjectId(job_id)}
push_op = {"lines": {"$each": log_lines}}

set_op = {
"original_line_count": record_count,
"stored_line_count": record_count,
"updated": time.time(),
}
update = {"$push": push_op, "$set": set_op}
with self.pymongo_client(self.mongo_collection) as pymongo_client:
job_col = pymongo_client[self.mongo_database][self.mongo_collection]
try:
job_col.update_one(update_filter, update, upsert=False)
except Exception as e:
error_msg = "Cannot update doc\n ERROR -- {}:\n{}".format(
e, "".join(traceback.format_exception(None, e, e.__traceback__))
)
raise ValueError(error_msg)

slc = job_col.find_one({"_id": ObjectId(job_id)}).get("stored_line_count")

return slc

def update_one(self, doc, job_id):
"""
update existing records
update existing records or create if they do not exist
https://docs.mongodb.com/manual/reference/operator/update/set/
"""
with self.pymongo_client(self.mongo_collection) as pymongo_client:
Expand Down
117 changes: 57 additions & 60 deletions lib/execution_engine2/sdk/EE2Logs.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import time
from enum import Enum
from typing import Dict, NamedTuple

from lib.execution_engine2.db.models.models import JobLog as JLModel, LogLines
from lib.execution_engine2.exceptions import RecordNotFoundException


# if TYPE_CHECKING:
# from lib.execution_engine2.sdk.SDKMethodRunner import SDKMethodRunner


class JobPermissions(Enum):
READ = "r"
WRITE = "w"
Expand All @@ -20,44 +23,63 @@ class AddLogResult(NamedTuple):
class EE2Logs:
def __init__(self, sdkmr):
self.sdkmr = sdkmr
self.mongo_util = self.sdkmr.get_mongo_util()

@staticmethod
def _create_new_log(pk):
jl = JLModel()
jl.primary_key = pk
jl.original_line_count = 0
jl.stored_line_count = 0
jl.lines = []
return jl

def _add_job_logs_helper(self, ee2_log: JLModel, log_lines: list):
"""
:param ee2_log: The mongo ee2_log to operate on
:param log_lines: The lines to add to this log
:return:
"""
original_line_count = ee2_log.get("original_line_count")
def _format_job_logs(self, record_position, log_lines):

log_lines_formatted = []
for input_line in log_lines:
original_line_count += 1
record_position += 1
ll = LogLines()
ll.error = int(input_line.get("is_error", 0)) == 1
ll.linepos = original_line_count

ll.linepos = record_position
ts = input_line.get("ts")
if ts is not None:
ts = self.sdkmr.check_and_convert_time(ts, assign_default_time=True)
ll.ts = ts

ll.line = input_line.get("line")
ll.validate()
ee2_log["lines"].append(ll.to_mongo().to_dict())
log_lines_formatted.append(ll.to_mongo().to_dict())

ee2_log["updated"] = time.time()
ee2_log["original_line_count"] = original_line_count
ee2_log["stored_line_count"] = original_line_count
return log_lines_formatted

return ee2_log
def _create_new_log(self, pk, log_lines: list):
"""
:param ee2_log: The mongo ee2_log to operate on
:param log_lines: The lines to add to this log
:return:
"""
with self.mongo_util.mongo_engine_connection():
jl = JLModel()
jl.primary_key = pk
jl.original_line_count = 0
jl.stored_line_count = 0
jl.lines = self._format_job_logs(record_position=-1, log_lines=log_lines)
jl.original_line_count = jl.stored_line_count = len(log_lines)
jl.save()
return jl

def _add_first_logs(self, log_lines, job_id):
"""
Initialize the log since it doesn't exist
:param log_lines:
:param job_id:
:return:
"""
self.sdkmr.logger.debug(f"About to create new log record for {job_id}")
log = self._create_new_log(pk=job_id, log_lines=log_lines)
return AddLogResult(success=True, stored_line_count=log.stored_line_count)

def _add_subsequent_logs(self, job_log, log_lines):
""" Add logs to an existing log entry """
formatted_logs = self._format_job_logs(
record_position=job_log["stored_line_count"] - 1, log_lines=log_lines
)
record_count = int(job_log["stored_line_count"]) + len(formatted_logs)
slc = self.mongo_util._push_job_logs(
formatted_logs, job_id=job_log["_id"], record_count=record_count
)
return AddLogResult(success=True, stored_line_count=slc)

def add_job_logs(self, job_id, log_lines, as_admin=False) -> AddLogResult:
"""
Expand All @@ -80,44 +102,21 @@ def add_job_logs(self, job_id, log_lines, as_admin=False) -> AddLogResult:
:param as_admin:
:return:
"""
log = {"stored_line_count": -1}

job_log = {"stored_line_count": -1}
self.sdkmr.get_job_with_permission(
job_id, JobPermissions.WRITE, as_admin=as_admin
)

self.sdkmr.logger.debug(f"About to add logs for {job_id}")
try:
self.sdkmr.logger.debug(f"About to add logs for {job_id}")
mongo_util = self.sdkmr.get_mongo_util()
try:
log = mongo_util.get_job_log_pymongo(job_id)
job_log = self.mongo_util.get_job_log_pymongo(job_id)
except RecordNotFoundException:
# What really should happen is the log is created and then SAVED, and then
# Retrieved again, and then we should use native log line append to the record
# INstead of updating the entire record.. And then update the line positions and line counts
# upon successfull appending of ALL logs
log = self._create_new_log(pk=job_id).to_mongo().to_dict()

log = self._add_job_logs_helper(ee2_log=log, log_lines=log_lines)

try:
with mongo_util.pymongo_client(
self.sdkmr.config["mongo-logs-collection"]
):
mongo_util.update_one(log, str(log.get("_id")))
except Exception as e:
self.sdkmr.logger.error(e)
ll = [{"line": f"{e}", "is_error": 1}]
log = self._add_job_logs_helper(ee2_log=log, log_lines=ll)
olc = mongo_util.update_one(log, str(log.get("_id")))
return olc

return self._add_first_logs(log_lines=log_lines, job_id=job_id)
return self._add_subsequent_logs(job_log, log_lines)
except Exception as e:
self.sdkmr.logger.error(e)
return AddLogResult(
success=False, stored_line_count=log["stored_line_count"]
)
except Exception:
return AddLogResult(
success=False, stored_line_count=log["stored_line_count"]
success=False, stored_line_count=job_log["stored_line_count"]
)

def _get_job_logs(self, job_id, skip_lines, limit=None) -> Dict:
Expand Down Expand Up @@ -146,12 +145,10 @@ def _get_job_logs(self, job_id, skip_lines, limit=None) -> Dict:
:return:
"""

log = self.sdkmr.get_mongo_util().get_job_log_pymongo(job_id)

log = self.mongo_util.get_job_log_pymongo(job_id)
lines = []
last_line_number = 0
count = len(log.get("lines", []))

for log_line in log.get("lines", []): # type: LogLines

if skip_lines and int(skip_lines) >= log_line.get("linepos", 0):
Expand Down
23 changes: 13 additions & 10 deletions test/tests_for_sdkmr/ee2_SDKMethodRunner_EE2Logs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ def test_add_job_logs_ok(self, rq_mock):

test_line = ori_lines[0]

self.assertEqual(test_line["line"], "Hello world")
self.assertEqual(test_line["linepos"], 1)
self.assertEquals(test_line["line"], "Hello world")
self.assertEqual(test_line["linepos"], 0)
self.assertEqual(test_line["error"], False)

# add job log
Expand All @@ -98,38 +98,41 @@ def test_add_job_logs_ok(self, rq_mock):
]

runner.add_job_logs(job_id=job_id, log_lines=lines)

#
log = self.mongo_util.get_job_log(job_id=job_id)
self.assertTrue(log.updated)
self.assertTrue(ori_updated_time < log.updated)
self.assertEqual(log.original_line_count, 3)
self.assertEqual(log.stored_line_count, 3)
ori_lines = log.lines
self.assertEqual(len(ori_lines), 3)

#
# original line

test_line = ori_lines[0]
print(test_line)
self.assertEqual(test_line["line"], "Hello world")
self.assertEqual(test_line["linepos"], 1)
self.assertEqual(test_line["error"], 0)
print(ori_lines)
self.assertEqual("Hello world", test_line["line"])
self.assertEqual(0, test_line["linepos"])
self.assertEqual(0, test_line["error"])

# new line
test_line = ori_lines[1]

self.assertEqual(test_line["line"], "Hello Kbase")
self.assertEqual(test_line["linepos"], 2)
self.assertEqual(test_line["linepos"], 1)
self.assertEqual(test_line["error"], 1)

test_line = ori_lines[2]

self.assertEqual(test_line["line"], "Hello Wrold Kbase")
self.assertEqual(test_line["linepos"], 3)
self.assertEqual(test_line["linepos"], 2)
self.assertEqual(test_line["error"], 0)

self.mongo_util.get_job(job_id=job_id).delete()
self.assertEqual(ori_job_count, Job.objects.count())

self.mongo_util.get_job_log(job_id=job_id).delete()
self.assertEqual(ori_job_log_count, JobLog.objects.count())


#
Loading

0 comments on commit c4f1fa2

Please sign in to comment.