diff --git a/mg/subscribe.py b/mg/subscribe.py index 0d8fdb9..2b8673f 100644 --- a/mg/subscribe.py +++ b/mg/subscribe.py @@ -7,16 +7,18 @@ Desc : 订阅redis的消息,写入数据库 """ +import datetime import json +import logging import time -import datetime +from concurrent.futures import ThreadPoolExecutor + import redis -import logging from shortuuid import uuid +from websdk2.consts import const from websdk2.db_context import DBContext + from models.paas_model import OperationRecords -from concurrent.futures import ThreadPoolExecutor -from websdk2.consts import const class RedisSubscriber: @@ -58,7 +60,11 @@ def process_message(msg_id, fields): log_data_dict['scheme'] = request_data.get('scheme') log_data_dict['uri'] = request_data.get('uri')[0:255] log_data_dict['method'] = request_data.get('method') - log_data_dict['rq_headers'] = str(request_data.get('headers')) + try: + log_data_dict['rq_headers'] = str(request_data.get('headers')) + except Exception as err: + log_data_dict['rq_headers'] = "{}" + logging.error(f"格式化header {err}") # log_data_dict['rq_query'] = request_data.get('method') request_data_data = request_data.get('data') @@ -78,9 +84,12 @@ def process_message(msg_id, fields): log_data_dict['response_status'] = response_data.get('status') log_data_dict['response_data'] = str(response_data) - start_time = log_data_dict.get('start_time') - start_time = int(start_time) / 1000 - times = datetime.datetime.fromtimestamp(start_time) + try: + start_time = log_data_dict.get('start_time') + start_time = int(start_time) / 1000 + times = datetime.datetime.fromtimestamp(start_time) + except Exception as err: + logging.error(f"datetime.datetime.fromtimestamp {start_time} {err}") log_data_dict['start_time'] = times # logging.info(log_data_dict) return log_data_dict @@ -112,11 +121,11 @@ def subscribe_msgs(self): items = self.redis_conn.xreadgroup(self.group_name, self.consumer_name, {self.stream_name: consumer_id}, block=0, count=1) except Exception as err: - print(err) + logging.error(f"xread group {err}") items = [] if not items: # 如果 block 不是 0或者为空, 会需要这段 - print("Timeout!") + logging.info("Timeout!") self.stream_message(self.stream_name) time.sleep(3) # 空值等待 3s self.redis_conn.xack(self.stream_name, self.group_name, lastid) ### 删除错误信息