Skip to content

Commit

Permalink
repair the problem from weibanhelper‘s change
Browse files Browse the repository at this point in the history
紧急恢复由于修改功能导致weibanhelper改变或缺失原有的部分,恢复原有功能的正常运行
  • Loading branch information
Nichovlasy committed Sep 21, 2024
1 parent e5f2101 commit 8a1d8cb
Showing 1 changed file with 72 additions and 25 deletions.
97 changes: 72 additions & 25 deletions WeiBanHelper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import ddddocr
import requests
import json
import datetime
from datetime import datetime

import random

from PIL import Image
Expand All @@ -29,17 +29,20 @@ class WeibanHelper:
exam_threshold = 1
headers = {
"X-Token": "",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"ContentType": "application/x-www-form-urlencoded; charset=UTF-8",
"User-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.82",
}

tempUserCourseId = ""

def __init__(self, account, password, school_name, auto_verify=False, project_index=0):
self.ocr = ddddocr.DdddOcr(show_ad=False)
tenant_code = self.get_tenant_code(school_name=school_name)
img_file_uuid = ""
verify_time = time.time()
self.session = self.create_session()

login_data = {}
# 验证码处理
verify_code = ''
if not auto_verify:
Expand All @@ -48,20 +51,22 @@ def __init__(self, account, password, school_name, auto_verify=False, project_in
verify_code = input("请输入验证码: ")
else:
verify_code = self.ocr.classification(self.get_verify_code(get_time=verify_time, download=False))

login_data = self.login(account, password, tenant_code, verify_code, verify_time)

if auto_verify:
while login_data['code'] == '-1' and "验证码" in str(login_data):
while login_data['code'] == '-1' and str(login_data).find("验证码") != -1:
verify_time = time.time()
verify_code = self.ocr.classification(self.get_verify_code(get_time=verify_time, download=False))
login_data = self.login(account, password, tenant_code, verify_code, verify_time)
time.sleep(5)

# 假设login_data是从某个请求返回的JSON数据中获取的
if 'data' in login_data:
login_data = login_data['data']
self.project_list = WeibanHelper.get_project_id(login_data["userId"], tenant_code, login_data["token"])
self.project_list = WeibanHelper.get_project_id(
login_data["userId"], tenant_code, login_data["token"]
)
else:
# 如果 'data' 键不存在,输出提示信息
print("登录失败,可能是学校名称输入错误。\n")
print(f"返回的错误信息: {login_data}\n")

Expand All @@ -76,11 +81,14 @@ def init(self, code, id, token, projectId):
self.headers["X-Token"] = self.x_token

def create_session(self):
"""
创建一个带有重试策略的会话对象。
"""
session = requests.Session()
retry_strategy = Retry(
total=5,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS", "POST"],
allowed_methods=["HEAD", "GET", "OPTIONS", "POST"], # 替换 `method_whitelist`
backoff_factor=1
)
adapter = HTTPAdapter(max_retries=retry_strategy)
Expand All @@ -89,40 +97,66 @@ def create_session(self):
return session

def retry_request(self, func, *args, retry_count=5, wait_time=3):
"""
封装的重试请求方法。
"""
for attempt in range(retry_count):
try:
return func(*args)
return func(*args) # 调用传入的函数并返回其结果
except (SSLError, Timeout, ConnectionError, HTTPError, RequestException, ProxyError) as e:
print(f"网络错误 [{type(e).__name__}]: {e},URL: {args[0]},正在重试 {attempt + 1} / {retry_count} 次...")
time.sleep(wait_time)
time.sleep(wait_time) # 等待指定时间后重试
if attempt == retry_count - 1:
print("达到最大重试次数,跳过此操作。")
return None
return None # 如果最终失败,返回 None

def start(self, courseId):
"""
启动课程学习的请求方法,包含重试机制和网络错误处理。
:param courseId: 课程ID,用于启动指定的课程学习。
"""
url = "https://weiban.mycourse.cn/pharos/usercourse/study.do"
data = {
"userProjectId": self.userProjectId,
"tenantCode": self.tenantCode,
"userId": self.userId,
"courseId": courseId,
}
headers = {"x-token": self.x_token}

try:
response = self.session.post(url, data=data, headers=self.headers, timeout=10, verify=False)

# 初始请求
response = self.session.post(
url,
data=data,
headers=headers,
proxies={"http": None, "https": None}, # 禁用代理
timeout=10, # 设置超时时间
verify=False # 如果需要跳过 SSL 证书验证
)

# 重试机制,直到请求成功
while json.loads(response.text).get("code") == -1:
print("请求未成功,继续等待并重试...")
time.sleep(5)
response = self.session.post(url, data=data, headers=self.headers, timeout=30, verify=False)

time.sleep(5) # 等待5秒后重试
response = self.session.post(
url,
data=data,
headers=headers,
proxies={"http": None, "https": None}, # 禁用代理
timeout=30,
verify=False
)

# 检查请求状态码
if response.status_code == 200:
print("课程启动成功")
else:
print(f"请求失败,状态码: {response.status_code},URL: {url}")

except (ProxyError, SSLError, Timeout, ConnectionError, HTTPError, RequestException) as e:
print(f"网络错误 [{type(e).__name__}]: {e},URL: {url}")
# 进一步处理或记录错误

def run(self):
for chooseType in [2, 3]:
Expand All @@ -148,9 +182,14 @@ def run(self):
index += 1
print(f"chooseType={chooseType} 的课程刷课完成")

# js里的时间戳似乎都是保留了三位小数的.
def __get_timestamp(self):
return str(round(time.time(), 3))

# Magic: 用于构造、拼接"完成学习任务"的url
def __gen_rand(self):
return ("3.4.1" + str(random.random())).replace(".", "")

def getProgress(self):
url = "https://weiban.mycourse.cn/pharos/project/showProgress.do"
data = {
Expand All @@ -163,9 +202,12 @@ def getProgress(self):
return data["data"]["progressPet"]

def getAnswerList(self):
"""
获取答题记录的列表,通过逐条获取的方式处理多个记录
"""
answer_list = []
url = "https://weiban.mycourse.cn/pharos/exam/reviewPaper.do?timestamp=" + self.__get_timestamp()
exam_id_list = self.listHistory()
exam_id_list = self.listHistory() # 调用 listHistory 来获取多个考试ID
for exam_id in exam_id_list:
data = {
"tenantCode": self.tenantCode,
Expand All @@ -174,31 +216,36 @@ def getAnswerList(self):
"isRetake": "2"
}
response = self.session.post(url, data=data, headers=self.headers)
answer_list.append(response.text)
if response.status_code == 200:
answer_list.append(response.text) # 存储每条考试的答题记录
return answer_list

def listHistory(self):
"""
获取用户的历史考试记录,并返回多个考试ID
"""
result = []
url = "https://weiban.mycourse.cn/pharos/exam/listHistory.do?timestamp=" + self.__get_timestamp()
exam_plan_id_list = self.listExamPlan()
exam_plan_id_list = self.listExamPlan() # 获取考试计划ID列表
for exam_plan_id in exam_plan_id_list:
dataList = {
"tenantCode": self.tenantCode,
"userId": self.userId,
"examPlanId": exam_plan_id
}
response = self.session.post(url, headers=self.headers, data=dataList)

data = json.loads(response.text)
if data['code'] == '-1':
continue # 跳过此计划
return result
else:
for item in data['data']:
result.append(item['id']) # 添加所有的考试记录ID

for history in data['data']: # 遍历历史考试记录
result.append(history['id'])
return result

def listExamPlan(self):
"""
获取用户的考试计划ID列表
"""
url = "https://weiban.mycourse.cn/pharos/record/project/listExamPlanStat.do?timestamp=" + self.__get_timestamp()
data = {
"tenantCode": self.tenantCode,
Expand All @@ -221,8 +268,8 @@ def getCategory(self, chooseType):
"chooseType": chooseType,
}
response = requests.post(url, data=data, headers=self.headers)
data = json.loads(response.text)
for i in data["data"]:
list = json.loads(response.text)["data"]
for i in list:
if i["totalNum"] > i["finishedNum"]:
result.append(i["categoryCode"])
return result
Expand Down

0 comments on commit 8a1d8cb

Please sign in to comment.