Skip to content

Commit

Permalink
2024年8月16日 优化日志处理功能
Browse files Browse the repository at this point in the history
  • Loading branch information
ss1917 committed Aug 16, 2024
1 parent d58e7a9 commit 4fba234
Showing 1 changed file with 19 additions and 10 deletions.
29 changes: 19 additions & 10 deletions mg/subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@
Desc : 订阅redis的消息,写入数据库
"""

import datetime
import json
import logging
import time
import datetime
from concurrent.futures import ThreadPoolExecutor

import redis
import logging
from shortuuid import uuid
from websdk2.consts import const
from websdk2.db_context import DBContext

from models.paas_model import OperationRecords
from concurrent.futures import ThreadPoolExecutor
from websdk2.consts import const


class RedisSubscriber:
Expand Down Expand Up @@ -58,7 +60,11 @@ def process_message(msg_id, fields):
log_data_dict['scheme'] = request_data.get('scheme')
log_data_dict['uri'] = request_data.get('uri')[0:255]
log_data_dict['method'] = request_data.get('method')
log_data_dict['rq_headers'] = str(request_data.get('headers'))
try:
log_data_dict['rq_headers'] = str(request_data.get('headers'))
except Exception as err:
log_data_dict['rq_headers'] = "{}"
logging.error(f"格式化header {err}")
# log_data_dict['rq_query'] = request_data.get('method')

request_data_data = request_data.get('data')
Expand All @@ -78,9 +84,12 @@ def process_message(msg_id, fields):

log_data_dict['response_status'] = response_data.get('status')
log_data_dict['response_data'] = str(response_data)
start_time = log_data_dict.get('start_time')
start_time = int(start_time) / 1000
times = datetime.datetime.fromtimestamp(start_time)
try:
start_time = log_data_dict.get('start_time')
start_time = int(start_time) / 1000
times = datetime.datetime.fromtimestamp(start_time)
except Exception as err:
logging.error(f"datetime.datetime.fromtimestamp {start_time} {err}")
log_data_dict['start_time'] = times
# logging.info(log_data_dict)
return log_data_dict
Expand Down Expand Up @@ -112,11 +121,11 @@ def subscribe_msgs(self):
items = self.redis_conn.xreadgroup(self.group_name, self.consumer_name, {self.stream_name: consumer_id},
block=0, count=1)
except Exception as err:
print(err)
logging.error(f"xread group {err}")
items = []

if not items: # 如果 block 不是 0或者为空, 会需要这段
print("Timeout!")
logging.info("Timeout!")
self.stream_message(self.stream_name)
time.sleep(3) # 空值等待 3s
self.redis_conn.xack(self.stream_name, self.group_name, lastid) ### 删除错误信息
Expand Down

0 comments on commit 4fba234

Please sign in to comment.