From 86adf2621ade33065f81c7a0a104335806f3f323 Mon Sep 17 00:00:00 2001 From: shoucandanghehe Date: Thu, 21 Sep 2023 22:32:27 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20=E4=BF=AE=E5=A4=8D=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=20ID=20=E7=BB=91=E5=AE=9A=E8=B4=A6=E5=8F=B7=E9=94=99?= =?UTF-8?q?=E8=AF=AF=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../io_data_processor/processor.py | 103 ++++++++++-------- 1 file changed, 57 insertions(+), 46 deletions(-) diff --git a/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor/processor.py b/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor/processor.py index d0cda4b6..573aeed5 100644 --- a/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor/processor.py +++ b/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor/processor.py @@ -16,7 +16,7 @@ class Processor: @classmethod async def handle_bind(cls, message: str, qq_number: int | None) -> str: - '''处理绑定消息''' + """处理绑定消息""" decoded_message = await handle_bind_message(message=message, game_type='IO') if decoded_message[0] is None: return decoded_message[1][0] @@ -25,6 +25,12 @@ async def handle_bind(cls, message: str, qq_number: int | None) -> str: if user_id_stats[0] is False: return user_id_stats[1] user_id = decoded_message[1][1] + if qq_number is None: # 理论上是不会有None出现的, ide快乐行属于是( + logger.error('获取QQ号失败') + return '获取QQ号失败' + return await DataBase.write_bind_info( + qq_number=qq_number, user=user_id, game_type='IO' + ) elif decoded_message[0] == 'Name': user_data = await cls.get_user_data(user_name=decoded_message[1][1]) if user_data[0] is False: @@ -32,18 +38,14 @@ async def handle_bind(cls, message: str, qq_number: int | None) -> str: if user_data[1] is False: return f'用户信息请求错误:\n{user_data[2]["error"]}' user_id = await cls.get_user_id(user_data[2]) - if qq_number is None: # 理论上是不会有None出现的, ide快乐行属于是( + if qq_number is None: # 理论上是不会有None出现的, ide快乐行属于是( logger.error('获取QQ号失败') return '获取QQ号失败' - return ( - await DataBase.write_bind_info( - qq_number=qq_number, - user=user_id, - game_type='IO' - ) + return await DataBase.write_bind_info( + qq_number=qq_number, user=user_id, game_type='IO' ) logger.error('预期外行为, 请上报GitHub') - return '出现预期外行为,请查看后台信息' + return '出现预期外行为, 请查看后台信息' @classmethod async def handle_rank(cls, message: str): @@ -74,11 +76,17 @@ async def handle_rank(cls, message: str): if query_rank.lower() not in (i for i in ranks_percentiles.keys()): return '未知段位' - result = await Request.request('https://ch.tetr.io/api/users/lists/league/all') + result = await Request.request( + 'https://ch.tetr.io/api/users/lists/league/all' + ) users: list = result[2]['data']['users'] - def avg(rank_users: list, column: str, playercount: int | None = None) -> float: - return sum(i['league'][column] for i in rank_users) / (playercount or len(rank_users)) + def avg( + rank_users: list, column: str, playercount: int | None = None + ) -> float: + return sum(i['league'][column] for i in rank_users) / ( + playercount or len(rank_users) + ) for rank, percentile in ranks_percentiles.items(): offset = math.floor((percentile / 100) * len(users)) - 1 @@ -97,8 +105,8 @@ def avg(rank_users: list, column: str, playercount: int | None = None) -> float: playercount=playercount, avgapm=avg_apm, avgpps=avg_pps, - avgvs=avg_vs - ) + avgvs=avg_vs, + ) return await Processor.handle_rank(message=message) else: @@ -123,12 +131,16 @@ def avg(rank_users: list, column: str, playercount: int | None = None) -> float: @classmethod async def handle_query(cls, message: str, qq_number: int | None): - '''处理查询消息''' - decoded_message = await handle_stats_query_message(message=message, game_type='IO') + """处理查询消息""" + decoded_message = await handle_stats_query_message( + message=message, game_type='IO' + ) if decoded_message[0] is None: return decoded_message[1][0] if decoded_message[0] == 'AT': # 在入口处判断是否@bot本身 - bind_info = await DataBase.query_bind_info(qq_number=decoded_message[1][1], game_type='IO') + bind_info = await DataBase.query_bind_info( + qq_number=decoded_message[1][1], game_type='IO' + ) if bind_info is None: return '未查询到绑定信息' return f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await Processor.generate_message(user_id=bind_info)}' @@ -136,7 +148,9 @@ async def handle_query(cls, message: str, qq_number: int | None): if qq_number is None: logger.error('获取QQ号失败') return '获取QQ号失败, 请联系bot主人' - bind_info = await DataBase.query_bind_info(qq_number=qq_number, game_type='IO') + bind_info = await DataBase.query_bind_info( + qq_number=qq_number, game_type='IO' + ) if bind_info is None: return '未查询到绑定信息' return f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await Processor.generate_message(user_id=bind_info)}' @@ -147,11 +161,9 @@ async def handle_query(cls, message: str, qq_number: int | None): @classmethod async def get_user_data( - cls, - user_name: str | None = None, - user_id: str | None = None + cls, user_name: str | None = None, user_id: str | None = None ) -> tuple[bool, bool, dict[str, Any]]: - '''获取用户数据''' + """获取用户数据""" if user_name is not None and user_id is None: user_data_url = f'https://ch.tetr.io/api/users/{user_name}' elif user_name is None and user_id is not None: @@ -162,11 +174,9 @@ async def get_user_data( @classmethod async def get_solo_data( - cls, - user_name: str | None = None, - user_id: str | None = None + cls, user_name: str | None = None, user_id: str | None = None ) -> tuple[bool, bool, dict[str, Any]]: - '''获取Solo数据''' + """获取Solo数据""" if user_name is not None and user_id is None: user_solo_url = f'https://ch.tetr.io/api/users/{user_name}/records' elif user_name is None and user_id is not None: @@ -177,12 +187,12 @@ async def get_solo_data( @classmethod async def get_user_id(cls, user_data: dict) -> str: - '''获取用户ID''' + """获取用户ID""" return user_data['data']['user']['_id'] @classmethod async def check_user_id(cls, user_id: str) -> tuple[bool, str]: - '''检查用户ID是否有效 返回值为tuple[bool, message]''' + """检查用户ID是否有效 返回值为tuple[bool, message]""" user_data = await cls.get_user_data(user_id=user_id) if user_data[0] is False: return False, '用户信息请求失败' @@ -190,18 +200,19 @@ async def check_user_id(cls, user_id: str) -> tuple[bool, str]: return False, f'用户信息请求错误:\n{user_data[2]["error"]}' if user_id == user_data[2]['data']['user']['_id']: return True, '' - raise ValueError('服务器返回的userID和用户提供的不一致, 这种情况理论上不应该发生, 以防万一还是写一下(x') + raise ValueError('服务器返回的userID和用户提供的不一致, 这种情况理论上不应该发生, 以防万一还是写一下(x') @classmethod async def get_league_stats(cls, user_data: dict) -> dict[str, Any]: - '''获取排位统计数据''' + """获取排位统计数据""" league = user_data['data']['user']['league'] league_stats: dict[str, Any] = {} if league['gamesplayed'] != 0: league_stats['PPS'] = league['pps'] league_stats['APM'] = league['apm'] league_stats['VS'] = 0 if league['vs'] is None else league['vs'] - league_stats['Rank'] = 'Z' if league['rank'] == 'z' else league['rank'].upper( + league_stats['Rank'] = ( + 'Z' if league['rank'] == 'z' else league['rank'].upper() ) if league['rating'] == -1: league_stats['Rank'] = None @@ -211,28 +222,29 @@ async def get_league_stats(cls, user_data: dict) -> dict[str, Any]: league_stats['RD'] = round(league['rd'], 2) league_stats['Standing'] = league['standing'] league_stats['LPM'] = round((league['pps'] * 24), 2) - league_stats['APL'] = round( - (league_stats['APM'] / league_stats['LPM']), 2) + league_stats['APL'] = round((league_stats['APM'] / league_stats['LPM']), 2) league_stats['ADPM'] = round((league_stats['VS'] * 0.6), 2) league_stats['ADPL'] = round( - (league_stats['ADPM'] / league_stats['LPM']), 2) + (league_stats['ADPM'] / league_stats['LPM']), 2 + ) return league_stats @classmethod async def get_sprint_stats(cls, solo_data: dict) -> dict[str, Any]: - '''获取40L统计数据''' + """获取40L统计数据""" sprint_stats = {} solo = solo_data['data']['records']['40l'] if solo['record'] is not None: sprint_stats['Time'] = round( - solo['record']['endcontext']['finalTime'] / 1000, 2) + solo['record']['endcontext']['finalTime'] / 1000, 2 + ) if solo['rank'] is not None: sprint_stats['Rank'] = solo['rank'] return sprint_stats @classmethod async def get_blitz_stats(cls, solo_data: dict) -> dict[str, Any]: - '''获取Blitz统计数据''' + """获取Blitz统计数据""" blitz_stats = {} blitz = solo_data['data']['records']['blitz'] if blitz['record'] is not None: @@ -243,14 +255,12 @@ async def get_blitz_stats(cls, solo_data: dict) -> dict[str, Any]: @classmethod async def generate_message( - cls, - user_name: str | None = None, - user_id: str | None = None + cls, user_name: str | None = None, user_id: str | None = None ) -> str: - '''生成消息''' + """生成消息""" user_data, solo_data = await gather( cls.get_user_data(user_name=user_name, user_id=user_id), - cls.get_solo_data(user_name=user_name, user_id=user_id) + cls.get_solo_data(user_name=user_name, user_id=user_id), ) if user_data[0] is False: return '用户信息请求失败' @@ -269,18 +279,19 @@ async def generate_message( message += f'用户 {user_name} 暂无段位, {league_stats["Rating"]} TR' else: message += f'{league_stats["Rank"]} 段用户 {user_name} {league_stats["Rating"]} TR (#{league_stats["Standing"]})' - message += f', 段位分 {league_stats["Glicko"]}±{league_stats["RD"]}, 最近十场的数据:' + message += ( + f', 段位分 {league_stats["Glicko"]}±{league_stats["RD"]}, 最近十场的数据:' + ) message += f'\nL\'PM: {league_stats["LPM"]} ( {league_stats["PPS"]} pps )' message += f'\nAPM: {league_stats["APM"]} ( x{league_stats["APL"]} )' - if league_stats["VS"] != 0: + if league_stats['VS'] != 0: message += f'\nADPM: {league_stats["ADPM"]} ( x{league_stats["ADPL"]} ) ( {league_stats["VS"]}vs )' if solo_data[0] is False: return f'{message}\nSolo统计数据请求失败' if solo_data[1] is False: return f'{message}\nSolo统计数据请求错误:\n{solo_data[2]["error"]}' sprint_stats, blitz_stats = await gather( - cls.get_sprint_stats(solo_data[2]), - cls.get_blitz_stats(solo_data[2]) + cls.get_sprint_stats(solo_data[2]), cls.get_blitz_stats(solo_data[2]) ) message += f'\n40L: {sprint_stats["Time"]}s' if 'Time' in sprint_stats else '' message += f' ( #{sprint_stats["Rank"]} )' if 'Rank' in sprint_stats else ''