From 8a1d8cb682f7040e2a47f9ac71af504f405d8f5c Mon Sep 17 00:00:00 2001 From: Nichovlasy Date: Sat, 21 Sep 2024 12:05:55 +0800 Subject: [PATCH] =?UTF-8?q?repair=20the=20problem=20from=20weibanhelper?= =?UTF-8?q?=E2=80=98s=20change?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 紧急恢复由于修改功能导致weibanhelper改变或缺失原有的部分,恢复原有功能的正常运行 --- WeiBanHelper.py | 97 ++++++++++++++++++++++++++++++++++++------------- 1 file changed, 72 insertions(+), 25 deletions(-) diff --git a/WeiBanHelper.py b/WeiBanHelper.py index e1cd739..7243032 100644 --- a/WeiBanHelper.py +++ b/WeiBanHelper.py @@ -6,8 +6,8 @@ import ddddocr import requests import json +import datetime from datetime import datetime - import random from PIL import Image @@ -29,10 +29,12 @@ class WeibanHelper: exam_threshold = 1 headers = { "X-Token": "", - "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", + "ContentType": "application/x-www-form-urlencoded; charset=UTF-8", "User-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.82", } + tempUserCourseId = "" + def __init__(self, account, password, school_name, auto_verify=False, project_index=0): self.ocr = ddddocr.DdddOcr(show_ad=False) tenant_code = self.get_tenant_code(school_name=school_name) @@ -40,6 +42,7 @@ def __init__(self, account, password, school_name, auto_verify=False, project_in verify_time = time.time() self.session = self.create_session() + login_data = {} # 验证码处理 verify_code = '' if not auto_verify: @@ -48,20 +51,22 @@ def __init__(self, account, password, school_name, auto_verify=False, project_in verify_code = input("请输入验证码: ") else: verify_code = self.ocr.classification(self.get_verify_code(get_time=verify_time, download=False)) - login_data = self.login(account, password, tenant_code, verify_code, verify_time) if auto_verify: - while login_data['code'] == '-1' and "验证码" in str(login_data): + while login_data['code'] == '-1' and str(login_data).find("验证码") != -1: verify_time = time.time() verify_code = self.ocr.classification(self.get_verify_code(get_time=verify_time, download=False)) login_data = self.login(account, password, tenant_code, verify_code, verify_time) time.sleep(5) - + # 假设login_data是从某个请求返回的JSON数据中获取的 if 'data' in login_data: login_data = login_data['data'] - self.project_list = WeibanHelper.get_project_id(login_data["userId"], tenant_code, login_data["token"]) + self.project_list = WeibanHelper.get_project_id( + login_data["userId"], tenant_code, login_data["token"] + ) else: + # 如果 'data' 键不存在,输出提示信息 print("登录失败,可能是学校名称输入错误。\n") print(f"返回的错误信息: {login_data}\n") @@ -76,11 +81,14 @@ def init(self, code, id, token, projectId): self.headers["X-Token"] = self.x_token def create_session(self): + """ + 创建一个带有重试策略的会话对象。 + """ session = requests.Session() retry_strategy = Retry( total=5, status_forcelist=[429, 500, 502, 503, 504], - allowed_methods=["HEAD", "GET", "OPTIONS", "POST"], + allowed_methods=["HEAD", "GET", "OPTIONS", "POST"], # 替换 `method_whitelist` backoff_factor=1 ) adapter = HTTPAdapter(max_retries=retry_strategy) @@ -89,17 +97,24 @@ def create_session(self): return session def retry_request(self, func, *args, retry_count=5, wait_time=3): + """ + 封装的重试请求方法。 + """ for attempt in range(retry_count): try: - return func(*args) + return func(*args) # 调用传入的函数并返回其结果 except (SSLError, Timeout, ConnectionError, HTTPError, RequestException, ProxyError) as e: print(f"网络错误 [{type(e).__name__}]: {e},URL: {args[0]},正在重试 {attempt + 1} / {retry_count} 次...") - time.sleep(wait_time) + time.sleep(wait_time) # 等待指定时间后重试 if attempt == retry_count - 1: print("达到最大重试次数,跳过此操作。") - return None + return None # 如果最终失败,返回 None def start(self, courseId): + """ + 启动课程学习的请求方法,包含重试机制和网络错误处理。 + :param courseId: 课程ID,用于启动指定的课程学习。 + """ url = "https://weiban.mycourse.cn/pharos/usercourse/study.do" data = { "userProjectId": self.userProjectId, @@ -107,15 +122,33 @@ def start(self, courseId): "userId": self.userId, "courseId": courseId, } + headers = {"x-token": self.x_token} try: - response = self.session.post(url, data=data, headers=self.headers, timeout=10, verify=False) - + # 初始请求 + response = self.session.post( + url, + data=data, + headers=headers, + proxies={"http": None, "https": None}, # 禁用代理 + timeout=10, # 设置超时时间 + verify=False # 如果需要跳过 SSL 证书验证 + ) + + # 重试机制,直到请求成功 while json.loads(response.text).get("code") == -1: print("请求未成功,继续等待并重试...") - time.sleep(5) - response = self.session.post(url, data=data, headers=self.headers, timeout=30, verify=False) - + time.sleep(5) # 等待5秒后重试 + response = self.session.post( + url, + data=data, + headers=headers, + proxies={"http": None, "https": None}, # 禁用代理 + timeout=30, + verify=False + ) + + # 检查请求状态码 if response.status_code == 200: print("课程启动成功") else: @@ -123,6 +156,7 @@ def start(self, courseId): except (ProxyError, SSLError, Timeout, ConnectionError, HTTPError, RequestException) as e: print(f"网络错误 [{type(e).__name__}]: {e},URL: {url}") + # 进一步处理或记录错误 def run(self): for chooseType in [2, 3]: @@ -148,9 +182,14 @@ def run(self): index += 1 print(f"chooseType={chooseType} 的课程刷课完成") + # js里的时间戳似乎都是保留了三位小数的. def __get_timestamp(self): return str(round(time.time(), 3)) + # Magic: 用于构造、拼接"完成学习任务"的url + def __gen_rand(self): + return ("3.4.1" + str(random.random())).replace(".", "") + def getProgress(self): url = "https://weiban.mycourse.cn/pharos/project/showProgress.do" data = { @@ -163,9 +202,12 @@ def getProgress(self): return data["data"]["progressPet"] def getAnswerList(self): + """ + 获取答题记录的列表,通过逐条获取的方式处理多个记录 + """ answer_list = [] url = "https://weiban.mycourse.cn/pharos/exam/reviewPaper.do?timestamp=" + self.__get_timestamp() - exam_id_list = self.listHistory() + exam_id_list = self.listHistory() # 调用 listHistory 来获取多个考试ID for exam_id in exam_id_list: data = { "tenantCode": self.tenantCode, @@ -174,13 +216,17 @@ def getAnswerList(self): "isRetake": "2" } response = self.session.post(url, data=data, headers=self.headers) - answer_list.append(response.text) + if response.status_code == 200: + answer_list.append(response.text) # 存储每条考试的答题记录 return answer_list def listHistory(self): + """ + 获取用户的历史考试记录,并返回多个考试ID + """ result = [] url = "https://weiban.mycourse.cn/pharos/exam/listHistory.do?timestamp=" + self.__get_timestamp() - exam_plan_id_list = self.listExamPlan() + exam_plan_id_list = self.listExamPlan() # 获取考试计划ID列表 for exam_plan_id in exam_plan_id_list: dataList = { "tenantCode": self.tenantCode, @@ -188,17 +234,18 @@ def listHistory(self): "examPlanId": exam_plan_id } response = self.session.post(url, headers=self.headers, data=dataList) - data = json.loads(response.text) if data['code'] == '-1': - continue # 跳过此计划 + return result else: - for item in data['data']: - result.append(item['id']) # 添加所有的考试记录ID - + for history in data['data']: # 遍历历史考试记录 + result.append(history['id']) return result def listExamPlan(self): + """ + 获取用户的考试计划ID列表 + """ url = "https://weiban.mycourse.cn/pharos/record/project/listExamPlanStat.do?timestamp=" + self.__get_timestamp() data = { "tenantCode": self.tenantCode, @@ -221,8 +268,8 @@ def getCategory(self, chooseType): "chooseType": chooseType, } response = requests.post(url, data=data, headers=self.headers) - data = json.loads(response.text) - for i in data["data"]: + list = json.loads(response.text)["data"] + for i in list: if i["totalNum"] > i["finishedNum"]: result.append(i["categoryCode"]) return result