forked from Junbo2002/scu-course-select
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfunc.py
338 lines (295 loc) · 10.9 KB
/
func.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# -*- coding: UTF-8 -*-
import ast
import json
import random
import re
import time
import ddddocr
import colorama
import requests
from config import *
ocr = ddddocr.DdddOcr(show_ad=False)
redis_key_re = re.compile('redisKey.*?\"(?P<redisKey>.*?)\"', re.S)
colorama.init(autoreset=True)
# 自动识别验证码
def get_captcha(url, session):
"""
登录界面验证码 or 选课界面验证码
:param url: 验证码图片地址
:param session:
:return: 验证码
"""
flag = True
code = ""
while flag:
code = ""
content = session.get(url=url, headers=header).content
_code = ocr.classification(content)
for char in _code:
if char.isalpha() or char.isdigit():
code += char
if len(code) == 4:
flag = False
return code
# 选课界面
def login(session):
"""
:param session:
:return:
"""
res = session.get(login_token_url).text
login_token = re.compile("([a-fA-F0-9]{32})").findall(res)[0]
login_data = {
"tokenValue": login_token,
'j_username': j_username,
'j_password': j_password,
'j_captcha': get_captcha(captcha_url, session)
}
try:
response = session.post(
url=login_url, headers=header, data=login_data).text
if "欢迎您" in response:
print("登陆成功!")
return "success"
else:
print("\033[0;33;40m密码或验证码错误,正在尝试重新登陆\033[0m")
return "failed"
except Exception as e:
print("def login() 出现问题:" + str(e))
return None
# 获取已选课程
def get_already_course(session):
"""
:param session:
:return:
"""
already_select_course_list = []
try:
response = session.get(
url=already_select_course_url, headers=header).text
for each in json.loads(response)['xkxx'][0]:
already_select_course_list.append(json.loads(
response)['xkxx'][0][each]['courseName'])
return already_select_course_list
except Exception as e:
print("def get_already_course() 出现问题:" + str(e))
return None
# 选课
def course_select(session, each_course, alreadySelectCourse, courseName, courseNum, coursekxhNum):
if courseName not in (course for course in alreadySelectCourse) and courseNum == \
each_course['kch'] and each_course['kxh'] in coursekxhNum.split():
if each_course['bkskyl'] <= 0:
print("\033[0;33;40m" + "课程名:" + each_course['kcm'] + " 教师:" +
each_course['skjs'] + " 课余量:" + str(each_course['bkskyl']) + "\033[0m")
else:
print("\033[0;32;40m" + "课程名:" + each_course['kcm'] + " 教师:" +
each_course['skjs'] + " 课余量:" + str(each_course['bkskyl']) + "\033[0m")
kcm = each_course['kcm'] # 课程名
kch = each_course['kch'] # 课程号
kxh = each_course['kxh'] # 课序号
status = queryTeacherJL(session, kch, kxh)
if status is None:
return
kcms = getKcms(kcm + "(" + kch + "@" + kxh + ")") # 获得编码后的课程信息
course_name = kch + "@" + kxh + "@" + selectcourse_xueqi
tokenValue, need_captcha = get_token_and_captcha(session)
if tokenValue is None:
return
select_data = {
'dealType': 5,
'fajhh': fajhh,
'kcIds': course_name,
'kcms': kcms,
'sj': '0_0',
'searchtj': courseName,
'kclbdm': '',
'inputCode': '',
'tokenValue': tokenValue
}
if need_captcha:
flag = True
code = get_captcha(select_captcha_url, session)
print('读取验证码:' + code)
select_data["inputCode"] = code
try:
c = session.post(url=select_url, data=select_data).text
c = json.loads(c)["result"]
print("选课状态:", c)
if c != "ok":
return
select_data = {
'dealType': 5,
'fajhh': fajhh,
'kcIds': course_name,
'kcms': kcms,
'sj': '0_0',
'searchtj': courseName,
'kclbdm': ''
}
html = session.post(url=redis_key_url, data=select_data).text
iter = redis_key_re.finditer(html)
redis_key = j_username
for it in iter:
redis_key = it.group("redisKey")
break
select_result = {
'kcNum': 1,
'redisKey': redis_key
}
time.sleep(0.5)
result = session.post(url=select_result_url, data=select_result).text
result = json.loads(result)
while str(result["isFinish"]) != "True" and str(result["isFinish"]) != "true":
time.sleep(0.5)
result = session.post(url=select_result_url, data=select_result).text
result = json.loads(result)
# print("\033[0;32;40m" + result["result"] + "\033[0m")
result = str(result["result"])
if "选课成功" in result:
print("选课结果" + result)
return True
print("选课结果" + result)
except Exception as e:
print("def course_select()() 出现问题:" + str(e))
else:
# print(f'课程:{courseName}信息有误,请核查')
pass
return False
# 获取选课token和选课是否需要验证码
def get_token_and_captcha(session):
"""
:param session:
:return:
"""
try:
response = session.get(url=course_select_url, headers=header).text
flag = len(re.compile('if\("1" == "1"\)').findall(response)) == 1
return re.compile("([a-fA-F0-9]{32})").findall(response)[0], flag
except Exception as e:
print("def get_token_and_captcha() 出现问题:" + str(e))
return None
# 课程名编码
def getKcms(kms):
"""
课程名的编码,用于post表单
:param kms: 课程名
:return:
"""
kcms = ""
for each in kms:
kcms += (str(ord(each)) + ",")
return kcms
# 查询课程课余量
def get_free_course_list(session, courseName):
list_data = {
'searchtj': courseName,
'xq': 0,
'jc': 0,
'kclbdm': ""
}
try:
response = session.post(
url=courseList_url, headers=header, data=list_data).content.decode()
return ast.literal_eval(json.loads(response)['rwRxkZlList'])
except Exception as e:
print("def get_free_course_list() 出现问题:" + str(e))
return None
# (可能)教务处反爬机制
def queryTeacherJL(session, kch, kxh):
data = {
"id": selectcourse_xueqi + "@" + kch + "@" + kxh
}
try:
response = session.post(url=queryTeacherJL_url,
data=data, headers=header).content.decode()
if(response):
return response
except Exception as e:
print("def queryTeacherJL() 出现问题:" + str(e))
return None
# 定时选课
def isSelectTime() -> bool:
Now = time.strftime("%H:%M:%S", time.localtime())
Now_time = date.datetime.strptime(Now,'%H:%M:%S')
toSelect_0 = date.datetime.strptime(selectTime[0], '%H:%M:%S')
toSelect_1 = date.datetime.strptime(selectTime[1], '%H:%M:%S')
return (Now_time>toSelect_0) and (Now_time < toSelect_1)
# 更新课程情况,去除已经选择的课程
def updateCourse(select_course_idx):
if len(select_course_idx) == 0:
return
global courseNames
global courseNums
global coursekxhNums
new_courseNames = []
new_courseNums = []
new_coursekxhNums = []
for i in range(len(courseNames)):
if i in select_course_idx:
continue
new_courseNames.append(courseNames[i])
new_courseNums.append(courseNums[i])
new_coursekxhNums.append(coursekxhNums[i])
courseNames = new_courseNames
courseNums = new_courseNums
coursekxhNums = new_coursekxhNums
def main(session):
cnt = 1
while cnt <= 6:
# 登录
loginResponse = login(session)
if loginResponse == "success":
# 控制选课开始时间
while not isSelectTime():
print("当前时间:"+str(date.datetime.now().time()).split('.')[0]+" 在非设置选课时间")
expireSeconds = date.datetime.strptime(selectTime[0],'%H:%M:%S') - date.datetime.strptime(time.strftime("%H:%M:%S", time.localtime()),'%H:%M:%S')
print("将在",expireSeconds,"后准时开始抢课!")
expireSeconds = expireSeconds.seconds
expireSeconds -= 10
startSecond = 11
if expireSeconds >= 0:
time.sleep(expireSeconds)
else:
startSecond = 11 + expireSeconds
for i in range(startSecond,0,-1):
print(i-1)
time.sleep(1)
print("\033[0;33;40m抢课开始! *_*\033[0m")
break
else:
print("\033[0;33;40m" + f"第{cnt}次登陆失败\n" + "\033[0m")
print()
cnt += 1
if cnt > 6:
# 登陆失败,退出程序
print("\033[0;33;40m登录失败,请检查学号密码正确性\033[0m")
return
clock = 1
while True:
print("\n正在第{}轮选课!".format(clock))
# 先查询已选课程
alreadySelectCourse = get_already_course(session)
# 查询不到已选课程就重新查询
if alreadySelectCourse is None:
continue
select_course_idx = []
for i in range(len(courseNames)):
if courseNames[i] in alreadySelectCourse:
select_course_idx.append(i)
print("\033[0;31;40m你已经选上了 %s !\033[0m" % (courseNames[i]))
updateCourse(select_course_idx)
if len(courseNames) == 0:
print("\033[0;33;40m选课完成 ^.^\033[0m")
exit()
for i in range(len(courseNames)):
# 然后查询要选课程的课余量
courseList = get_free_course_list(session, courseNames[i])
if courseList is None:
continue
# 如果这门课没有被选择开始选课
for each_course in courseList:
if course_select(session, each_course, alreadySelectCourse,
courseNames[i], courseNums[i], coursekxhNums[i]):
break
time.sleep(random.uniform(1.5, 3))
clock = clock + 1