-
Notifications
You must be signed in to change notification settings - Fork 69
/
jd_wskey.py
680 lines (631 loc) · 35.6 KB
/
jd_wskey.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
# -*- coding: utf-8 -*
'''
new Env('wskey转换');
'''
import socket # 用于端口检测
import base64 # 用于编解码
import json # 用于Json解析
import os # 用于导入系统变量
import sys # 实现 sys.exit
import logging # 用于日志输出
import time # 时间
import re # 正则过滤
import hmac
import struct
WSKEY_MODE = 0
# 0 = Default / 1 = Debug!
if "WSKEY_DEBUG" in os.environ or WSKEY_MODE: # 判断调试模式变量
logging.basicConfig(level=logging.DEBUG, format='%(message)s') # 设置日志为 Debug等级输出
logger = logging.getLogger(__name__) # 主模块
logger.debug("\nDEBUG模式开启!\n") # 消息输出
else: # 判断分支
logging.basicConfig(level=logging.INFO, format='%(message)s') # Info级日志
logger = logging.getLogger(__name__) # 主模块
try: # 异常捕捉
import requests # 导入HTTP模块
except Exception as e: # 异常捕捉
logger.info(str(e) + "\n缺少requests模块, 请执行命令:pip3 install requests\n") # 日志输出
sys.exit(1) # 退出脚本
os.environ['no_proxy'] = '*' # 禁用代理
requests.packages.urllib3.disable_warnings() # 抑制错误
try: # 异常捕捉
from notify import send # 导入青龙消息通知模块
except Exception as err: # 异常捕捉
logger.debug(str(err)) # 调试日志输出
logger.info("无推送文件") # 标准日志输出
ver = 21212 # 版本号
# def ql_2fa():
# ''' Demo
# if "WSKEY_TOKEN" in os.environ:
# url = 'http://127.0.0.1:{0}/api/user'.format(port) # 设置 URL地址
# try: # 异常捕捉
# res = s.get(url) # HTTP请求 [GET] 使用 session
# except Exception as err: # 异常捕捉
# logger.debug(str(err)) # 调试日志输出
# else: # 判断分支
# if res.status_code == 200 and res.json()["code"] == 200:
# twoFactorActivated = str(res.json()["data"]["twoFactorActivated"])
# if twoFactorActivated == 'true':
# logger.info("青龙 2FA 已开启!")
# url = 'http://127.0.0.1:{0}/api/envs?searchValue=WSKEY_Client'.format(port) # 设置 URL地址
# res = s.get(url)
# if res.status_code == 200 and res.json()["code"] == 200:
# data = res.json()["data"]
# if len(data) == 0:
# url = 'http://127.0.0.1:{0}/api/apps'
# data = json.dumps({
# "name": "wskey",
# "scopes": ["crons", "envs", "configs", "scripts", "logs", "dependencies", "system"]
# })
# res = s.post(url, data=data)
# if res.status_code == 200 and res.json()["code"] == 200:
# logger.info("OpenApi创建成功")
# client_id = res.json()["data"]["client_id"]
# client_secret = res.json()["data"]["client_secret"]
# wskey_value = 'client_id={0}&client_secret={1}'.format(client_id, client_secret)
# data = [{"value": wskey_value, "name": "WSKEY_Client", "remarks": "WSKEY_OpenApi请勿删除"}]
# data = json.dumps(data) # Json格式化数据
# url = 'http://127.0.0.1:{0}/api/envs'.format(port) # 设置 URL地址
# s.post(url=url, data=data) # HTTP[POST]请求 使用session
# logger.info("\nWSKEY_Client变量添加完成\n--------------------\n") # 标准日志输出
# '''
def ttotp(key):
key = base64.b32decode(key.upper() + '=' * ((8 - len(key)) % 8))
counter = struct.pack('>Q', int(time.time() / 30))
mac = hmac.new(key, counter, 'sha1').digest()
offset = mac[-1] & 0x0f
binary = struct.unpack('>L', mac[offset:offset + 4])[0] & 0x7fffffff
return str(binary)[-6:].zfill(6)
def ql_send(text):
if "WSKEY_SEND" in os.environ and os.environ["WSKEY_SEND"] == 'disable':
return True
else:
try: # 异常捕捉
send('WSKEY转换', text) # 消息发送
except Exception as err: # 异常捕捉
logger.debug(str(err)) # Debug日志输出
logger.info("通知发送失败") # 标准日志输出
# 登录青龙 返回值 token
def get_qltoken(username, password, twoFactorSecret): # 方法 用于获取青龙 Token
logger.info("Token失效, 新登陆\n") # 日志输出
if twoFactorSecret:
try:
twoCode = ttotp(twoFactorSecret)
except Exception as err:
logger.debug(str(err)) # Debug日志输出
logger.info("TOTP异常")
sys.exit(1)
url = ql_url + "api/user/login" # 设置青龙地址 使用 format格式化自定义端口
payload = json.dumps({
'username': username,
'password': password
}) # HTTP请求载荷
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
} # HTTP请求头 设置为 Json格式
try: # 异常捕捉
res = requests.post(url=url, headers=headers, data=payload) # 使用 requests模块进行 HTTP POST请求
if res.status_code == 200 and res.json()["code"] == 420:
url = ql_url + 'api/user/two-factor/login'
data = json.dumps({
"username": username,
"password": password,
"code": twoCode
})
res = requests.put(url=url, headers=headers, data=data)
if res.status_code == 200 and res.json()["code"] == 200:
token = res.json()["data"]['token'] # 从 res.text 返回值中 取出 Token值
return token
else:
logger.info("两步校验失败\n") # 日志输出
sys.exit(1)
elif res.status_code == 200 and res.json()["code"] == 200:
token = res.json()["data"]['token'] # 从 res.text 返回值中 取出 Token值
return token
except Exception as err:
logger.debug(str(err)) # Debug日志输出
sys.exit(1)
else:
url = ql_url + 'api/user/login'
payload = {
'username': username,
'password': password
} # HTTP请求载荷
payload = json.dumps(payload) # json格式化载荷
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
} # HTTP请求头 设置为 Json格式
try: # 异常捕捉
res = requests.post(url=url, headers=headers, data=payload) # 使用 requests模块进行 HTTP POST请求
if res.status_code == 200 and res.json()["code"] == 200:
token = res.json()["data"]['token'] # 从 res.text 返回值中 取出 Token值
return token
else:
ql_send("青龙登录失败!")
sys.exit(1) # 脚本退出
except Exception as err:
logger.debug(str(err)) # Debug日志输出
logger.info("使用旧版青龙登录接口")
url = ql_url + 'api/login' # 设置青龙地址 使用 format格式化自定义端口
payload = {
'username': username,
'password': password
} # HTTP请求载荷
payload = json.dumps(payload) # json格式化载荷
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
} # HTTP请求头 设置为 Json格式
try: # 异常捕捉
res = requests.post(url=url, headers=headers, data=payload) # 使用 requests模块进行 HTTP POST请求
token = json.loads(res.text)["data"]['token'] # 从 res.text 返回值中 取出 Token值
except Exception as err: # 异常捕捉
logger.debug(str(err)) # Debug日志输出
logger.info("青龙登录失败, 请检查面板状态!") # 标准日志输出
ql_send('青龙登陆失败, 请检查面板状态.')
sys.exit(1) # 脚本退出
else: # 无异常执行分支
return token # 返回 token值
# else: # 无异常执行分支
# return token # 返回 token值
# 返回值 Token
def ql_login(): # 方法 青龙登录(获取Token 功能同上)
path = '/ql/config/auth.json' # 设置青龙 auth文件地址
if not os.path.isfile(path):
path = '/ql/data/config/auth.json' # 尝试设置青龙 auth 新版文件地址
if os.path.isfile(path): # 进行文件真值判断
with open(path, "r") as file: # 上下文管理
auth = file.read() # 读取文件
file.close() # 关闭文件
auth = json.loads(auth) # 使用 json模块读取
username = auth["username"] # 提取 username
password = auth["password"] # 提取 password
token = auth["token"] # 提取 authkey
try:
twoFactorSecret = auth["twoFactorSecret"]
except Exception as err:
logger.debug(str(err)) # Debug日志输出
twoFactorSecret = ''
if token == '': # 判断 Token是否为空
return get_qltoken(username, password, twoFactorSecret) # 调用方法 get_qltoken 传递 username & password
else: # 判断分支
url = ql_url + 'api/user' # 设置URL请求地址 使用 Format格式化端口
headers = {
'Authorization': 'Bearer {0}'.format(token),
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.71 Safari/537.36 Edg/94.0.992.38'
} # 设置用于 HTTP头
res = requests.get(url=url, headers=headers) # 调用 request模块发送 get请求
if res.status_code == 200: # 判断 HTTP返回状态码
return token # 有效 返回 token
else: # 判断分支
return get_qltoken(username, password, twoFactorSecret) # 调用方法 get_qltoken 传递 username & password
else: # 判断分支
logger.info("没有发现auth文件, 你这是青龙吗???") # 输出标准日志
sys.exit(0) # 脚本退出
# 返回值 list[wskey]
def get_wskey(): # 方法 获取 wskey值 [系统变量传递]
if "JD_WSCK" in os.environ: # 判断 JD_WSCK是否存在于环境变量
wskey_list = os.environ['JD_WSCK'].split('&') # 读取系统变量 以 & 分割变量
if len(wskey_list) > 0: # 判断 WSKEY 数量 大于 0 个
return wskey_list # 返回 WSKEY [LIST]
else: # 判断分支
logger.info("JD_WSCK变量未启用") # 标准日志输出
sys.exit(1) # 脚本退出
else: # 判断分支
logger.info("未添加JD_WSCK变量") # 标准日志输出
sys.exit(0) # 脚本退出
# 返回值 list[jd_cookie]
def get_ck(): # 方法 获取 JD_COOKIE值 [系统变量传递] <! 此方法未使用 !>
if "JD_COOKIE" in os.environ: # 判断 JD_COOKIE是否存在于环境变量
ck_list = os.environ['JD_COOKIE'].split('&') # 读取系统变量 以 & 分割变量
if len(ck_list) > 0: # 判断 WSKEY 数量 大于 0 个
return ck_list # 返回 JD_COOKIE [LIST]
else: # 判断分支
logger.info("JD_COOKIE变量未启用") # 标准日志输出
sys.exit(1) # 脚本退出
else: # 判断分支
logger.info("未添加JD_COOKIE变量") # 标准日志输出
sys.exit(0) # 脚本退出
# 返回值 bool
def check_ck(ck): # 方法 检查 Cookie有效性 使用变量传递 单次调用
searchObj = re.search(r'pt_pin=([^;\s]+)', ck, re.M | re.I) # 正则检索 pt_pin
if searchObj: # 真值判断
pin = searchObj.group(1) # 取值
else: # 判断分支
pin = ck.split(";")[1] # 取值 使用 ; 分割
if "WSKEY_UPDATE_HOUR" in os.environ: # 判断 WSKEY_UPDATE_HOUR是否存在于环境变量
updateHour = 23 # 更新间隔23小时
if os.environ["WSKEY_UPDATE_HOUR"].isdigit(): # 检查是否为 DEC值
updateHour = int(os.environ["WSKEY_UPDATE_HOUR"]) # 使用 int化数字
nowTime = time.time() # 获取时间戳 赋值
updatedAt = 0.0 # 赋值
searchObj = re.search(r'__time=([^;\s]+)', ck, re.M | re.I) # 正则检索 [__time=]
if searchObj: # 真值判断
updatedAt = float(searchObj.group(1)) # 取值 [float]类型
if nowTime - updatedAt >= (updateHour * 60 * 60) - (10 * 60): # 判断时间操作
logger.info(str(pin) + ";即将到期或已过期\n") # 标准日志输出
return False # 返回 Bool类型 False
else: # 判断分支
remainingTime = (updateHour * 60 * 60) - (nowTime - updatedAt) # 时间运算操作
hour = int(remainingTime / 60 / 60) # 时间运算操作 [int]
minute = int((remainingTime % 3600) / 60) # 时间运算操作 [int]
logger.info(str(pin) + ";未到期,{0}时{1}分后更新\n".format(hour, minute)) # 标准日志输出
return True # 返回 Bool类型 True
elif "WSKEY_DISCHECK" in os.environ: # 判断分支 WSKEY_DISCHECK 是否存在于系统变量
logger.info("不检查账号有效性\n--------------------\n") # 标准日志输出
return False # 返回 Bool类型 False
else: # 判断分支
url = 'https://me-api.jd.com/user_new/info/GetJDUserInfoUnion' # 设置JD_API接口地址
headers = {
'Cookie': ck,
'Referer': 'https://home.m.jd.com/myJd/home.action',
'user-agent': ua
} # 设置 HTTP头
try: # 异常捕捉
res = requests.get(url=url, headers=headers, verify=False, timeout=10, allow_redirects=False) # 进行 HTTP请求[GET] 超时 10秒
except Exception as err: # 异常捕捉
logger.debug(str(err)) # 调试日志输出
logger.info("JD接口错误 请重试或者更换IP") # 标准日志输出
return False # 返回 Bool类型 False
else: # 判断分支
if res.status_code == 200: # 判断 JD_API 接口是否为 200 [HTTP_OK]
code = int(json.loads(res.text)['retcode']) # 使用 Json模块对返回数据取值 int([retcode])
if code == 0: # 判断 code值
logger.info(str(pin) + ";状态正常\n") # 标准日志输出
return True # 返回 Bool类型 True
else: # 判断分支
logger.info(str(pin) + ";状态失效\n")
return False # 返回 Bool类型 False
else: # 判断分支
logger.info("JD接口错误码: " + str(res.status_code)) # 标注日志输出
return False # 返回 Bool类型 False
# 返回值 bool jd_ck
def getToken(wskey): # 方法 获取 Wskey转换使用的 Token 由 JD_API 返回 这里传递 wskey
if flag == 'bak':
return getToken_bak(wskey)
try: # 异常捕捉
url = str(base64.b64decode(url_t).decode()) + 'api/genToken' # 设置云端服务器地址 路由为 genToken
header = {"User-Agent": ua} # 设置 HTTP头
params = requests.get(url=url, headers=header, verify=False, timeout=20).json() # 设置 HTTP请求参数 超时 20秒 Json解析
except Exception as err: # 异常捕捉
logger.info("Params参数获取失败") # 标准日志输出
logger.debug(str(err)) # 调试日志输出
return False, wskey # 返回 -> False[Bool], Wskey
headers = {
'cookie': wskey,
'content-type': 'application/x-www-form-urlencoded; charset=UTF-8',
'charset': 'UTF-8',
'accept-encoding': 'br,gzip,deflate',
'user-agent': ua
} # 设置 HTTP头
url = 'https://api.m.jd.com/client.action' # 设置 URL地址
data = 'body=%7B%22to%22%3A%22https%253a%252f%252fplogin.m.jd.com%252fjd-mlogin%252fstatic%252fhtml%252fappjmp_blank.html%22%7D&' # 设置 POST 载荷
try: # 异常捕捉
res = requests.post(url=url, params=params, headers=headers, data=data, verify=False,
timeout=10) # HTTP请求 [POST] 超时 10秒
res_json = json.loads(res.text) # Json模块 取值
tokenKey = res_json['tokenKey'] # 取出TokenKey
except Exception as err: # 异常捕捉
logger.info("JD_WSKEY接口抛出错误 尝试重试 更换IP") # 标准日志输出
logger.info(str(err)) # 标注日志输出
return False, wskey # 返回 -> False[Bool], Wskey
else: # 判断分支
return appjmp(wskey, tokenKey) # 传递 wskey, Tokenkey 执行方法 [appjmp]
# 备用
def getToken_bak(wskey): # 方法 获取 Wskey转换使用的 Token 由 JD_API 返回 这里传递 wskey
try: # 异常捕捉
url = str(base64.b64decode('aHR0cHM6Ly9hcGkubm9sYW5zdG9yZS5jYy8=').decode()) + 'sign' # 设置云端服务器地址 路由为 genToken
header = {"Content-Type": "application/json"} # 设置 HTTP头
data = {'body':{"to":"https%3a%2f%2fplogin.m.jd.com%2fjd-mlogin%2fstatic%2fhtml%2fappjmp_blank.html"},'fn':'genToken'}
params = requests.post(url=url, headers=header, json=data, verify=False, timeout=20).json() # 设置 HTTP请求参数 超时 20秒 Json解析
params = 'functionId=genToken&'+params['body']
except Exception as err: # 异常捕捉
logger.info("Params参数获取失败") # 标准日志输出
logger.debug(str(err)) # 调试日志输出
return False, wskey # 返回 -> False[Bool], Wskey
headers = {
'cookie': wskey,
'content-type': 'application/x-www-form-urlencoded; charset=UTF-8',
'charset': 'UTF-8',
'accept-encoding': 'br,gzip,deflate',
'user-agent': ua
} # 设置 HTTP头
url = 'https://api.m.jd.com/client.action' # 设置 URL地址
data = 'body=%7B%22to%22%3A%22https%253a%252f%252fplogin.m.jd.com%252fjd-mlogin%252fstatic%252fhtml%252fappjmp_blank.html%22%7D&' # 设置 POST 载荷
try: # 异常捕捉
res = requests.post(url=url, params=params, headers=headers, data=data, verify=False,
timeout=10) # HTTP请求 [POST] 超时 10秒
res_json = json.loads(res.text) # Json模块 取值
tokenKey = res_json['tokenKey'] # 取出TokenKey
except Exception as err: # 异常捕捉
logger.info("JD_WSKEY接口抛出错误 尝试重试 更换IP") # 标准日志输出
logger.info(str(err)) # 标注日志输出
return False, wskey # 返回 -> False[Bool], Wskey
else: # 判断分支
return appjmp(wskey, tokenKey) # 传递 wskey, Tokenkey 执行方法 [appjmp]
# 返回值 bool jd_ck
def appjmp(wskey, tokenKey): # 方法 传递 wskey & tokenKey
wskey = "pt_" + str(wskey.split(";")[0]) # 变量组合 使用 ; 分割变量 拼接 pt_
if tokenKey == 'xxx': # 判断 tokenKey返回值
logger.info(str(wskey) + ";疑似IP风控等问题 默认为失效\n--------------------\n") # 标准日志输出
return False, wskey # 返回 -> False[Bool], Wskey
headers = {
'User-Agent': ua,
'accept': 'accept:text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'x-requested-with': 'com.jingdong.app.mall'
} # 设置 HTTP头
params = {
'tokenKey': tokenKey,
'to': 'https://plogin.m.jd.com/jd-mlogin/static/html/appjmp_blank.html'
} # 设置 HTTP_URL 参数
url = 'https://un.m.jd.com/cgi-bin/app/appjmp' # 设置 URL地址
try: # 异常捕捉
res = requests.get(url=url, headers=headers, params=params, verify=False, allow_redirects=False,
timeout=20) # HTTP请求 [GET] 阻止跳转 超时 20秒
except Exception as err: # 异常捕捉
logger.info("JD_appjmp 接口错误 请重试或者更换IP\n") # 标准日志输出
logger.info(str(err)) # 标准日志输出
return False, wskey # 返回 -> False[Bool], Wskey
else: # 判断分支
try: # 异常捕捉
res_set = res.cookies.get_dict() # 从res cookie取出
pt_key = 'pt_key=' + res_set['pt_key'] # 取值 [pt_key]
pt_pin = 'pt_pin=' + res_set['pt_pin'] # 取值 [pt_pin]
if "WSKEY_UPDATE_HOUR" in os.environ: # 判断是否在系统变量中启用 WSKEY_UPDATE_HOUR
jd_ck = str(pt_key) + ';' + str(pt_pin) + ';__time=' + str(time.time()) + ';' # 拼接变量
else: # 判断分支
jd_ck = str(pt_key) + ';' + str(pt_pin) + ';' # 拼接变量
except Exception as err: # 异常捕捉
logger.info("JD_appjmp提取Cookie错误 请重试或者更换IP\n") # 标准日志输出
logger.info(str(err)) # 标准日志输出
return False, wskey # 返回 -> False[Bool], Wskey
else: # 判断分支
if 'fake' in pt_key: # 判断 pt_key中 是否存在fake
logger.info(str(wskey) + ";WsKey状态失效\n") # 标准日志输出
return False, wskey # 返回 -> False[Bool], Wskey
else: # 判断分支
logger.info(str(wskey) + ";WsKey状态正常\n") # 标准日志输出
return True, jd_ck # 返回 -> True[Bool], jd_ck
def update(): # 方法 脚本更新模块
up_ver = int(cloud_arg['update']) # 云端参数取值 [int]
if ver >= up_ver: # 判断版本号大小
logger.info("当前脚本版本: " + str(ver)) # 标准日志输出
logger.info("--------------------\n") # 标准日志输出
else: # 判断分支
logger.info("当前脚本版本: " + str(ver) + "新版本: " + str(up_ver)) # 标准日志输出
logger.info("存在新版本, 请更新脚本后执行") # 标准日志输出
logger.info("--------------------\n") # 标准日志输出
text = '当前脚本版本: {0}新版本: {1}, 请更新脚本~!'.format(ver, up_ver) # 设置发送内容
ql_send(text)
# sys.exit(0) # 退出脚本 [未启用]
def ql_check(port): # 方法 检查青龙端口
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Socket模块初始化
sock.settimeout(2) # 设置端口超时
try: # 异常捕捉
sock.connect(('127.0.0.1', port)) # 请求端口
except Exception as err: # 捕捉异常
logger.debug(str(err)) # 调试日志输出
sock.close() # 端口关闭
return False # 返回 -> False[Bool]
else: # 分支判断
sock.close() # 关闭端口
return True # 返回 -> True[Bool]
def serch_ck(pin): # 方法 搜索 Pin
for i in range(len(envlist)): # For循环 变量[envlist]的数量
if "name" not in envlist[i] or envlist[i]["name"] != "JD_COOKIE": # 判断 envlist内容
continue # 继续循环
if pin in envlist[i]['value']: # 判断envlist取值['value']
value = envlist[i]['value'] # 取值['value']
id = envlist[i][ql_id] # 取值 [ql_id](变量)
logger.info(str(pin) + "检索成功\n") # 标准日志输出
return True, value, id # 返回 -> True[Bool], value, id
else: # 判断分支
continue # 继续循环
logger.info(str(pin) + "检索失败\n") # 标准日志输出
return False, 1 # 返回 -> False[Bool], 1
def get_env(): # 方法 读取变量
url = ql_url + 'api/envs'
try: # 异常捕捉
res = s.get(url) # HTTP请求 [GET] 使用 session
except Exception as err: # 异常捕捉
logger.debug(str(err)) # 调试日志输出
logger.info("\n青龙环境接口错误") # 标准日志输出
sys.exit(1) # 脚本退出
else: # 判断分支
data = json.loads(res.text)['data'] # 使用Json模块提取值[data]
return data # 返回 -> data
def check_id(): # 方法 兼容青龙老版本与新版本 id & _id的问题
url = ql_url + 'api/envs'
try: # 异常捕捉
res = s.get(url).json() # HTTP[GET] 请求 使用 session
except Exception as err: # 异常捕捉
logger.debug(str(err)) # 调试日志输出
logger.info("\n青龙环境接口错误") # 标准日志输出
sys.exit(1) # 脚本退出
else: # 判断分支
if '_id' in res['data'][0]: # 判断 [_id]
logger.info("使用 _id 键值") # 标准日志输出
return '_id' # 返回 -> '_id'
else: # 判断分支
logger.info("使用 id 键值") # 标准日志输出
return 'id' # 返回 -> 'id'
def ql_update(e_id, n_ck): # 方法 青龙更新变量 传递 id cookie
url = ql_url + 'api/envs'
data = {
"name": "JD_COOKIE",
"value": n_ck,
ql_id: e_id
} # 设置 HTTP POST 载荷
data = json.dumps(data) # json模块格式化
s.put(url=url, data=data) # HTTP [PUT] 请求 使用 session
ql_enable(eid) # 调用方法 ql_enable 传递 eid
def ql_enable(e_id): # 方法 青龙变量启用 传递值 eid
url = ql_url + 'api/envs/enable'
data = '["{0}"]'.format(e_id) # 格式化 POST 载荷
res = json.loads(s.put(url=url, data=data).text) # json模块读取 HTTP[PUT] 的返回值
if res['code'] == 200: # 判断返回值为 200
logger.info("\n账号启用\n--------------------\n") # 标准日志输出
return True # 返回 ->True
else: # 判断分支
logger.info("\n账号启用失败\n--------------------\n") # 标准日志输出
return False # 返回 -> Fasle
def ql_disable(e_id): # 方法 青龙变量禁用 传递 eid
url = ql_url + 'api/envs/disable'
data = '["{0}"]'.format(e_id) # 格式化 POST 载荷
res = json.loads(s.put(url=url, data=data).text) # json模块读取 HTTP[PUT] 的返回值
if res['code'] == 200: # 判断返回值为 200
logger.info("\n账号禁用成功\n--------------------\n") # 标准日志输出
return True # 返回 ->True
else: # 判断分支
logger.info("\n账号禁用失败\n--------------------\n") # 标准日志输出
return False # 返回 -> Fasle
def ql_insert(i_ck): # 方法 插入新变量
data = [{"value": i_ck, "name": "JD_COOKIE"}] # POST数据载荷组合
data = json.dumps(data) # Json格式化数据
url = ql_url + 'api/envs'
s.post(url=url, data=data) # HTTP[POST]请求 使用session
logger.info("\n账号添加完成\n--------------------\n") # 标准日志输出
def cloud_info(): # 方法 云端信息
url = str(base64.b64decode(url_t).decode()) + 'api/check_api' # 设置 URL地址 路由 [check_api]
for i in range(3): # For循环 3次
try: # 异常捕捉
headers = {"authorization": "Bearer Shizuku"} # 设置 HTTP头
res = requests.get(url=url, verify=False, headers=headers, timeout=20).text # HTTP[GET] 请求 超时 20秒
except requests.exceptions.ConnectTimeout: # 异常捕捉
logger.info("\n获取云端参数超时, 正在重试!" + str(i)) # 标准日志输出
time.sleep(1) # 休眠 1秒
continue # 循环继续
except requests.exceptions.ReadTimeout: # 异常捕捉
logger.info("\n获取云端参数超时, 正在重试!" + str(i)) # 标准日志输出
time.sleep(1) # 休眠 1秒
continue # 循环继续
except Exception as err: # 异常捕捉
logger.info("\n未知错误云端, 退出脚本!") # 标准日志输出
logger.debug(str(err)) # 调试日志输出
sys.exit(1) # 脚本退出
else: # 分支判断
try: # 异常捕捉
c_info = json.loads(res) # json读取参数
except Exception as err: # 异常捕捉
logger.info("云端参数解析失败") # 标准日志输出
logger.debug(str(err)) # 调试日志输出
sys.exit(1) # 脚本退出
else: # 分支判断
return c_info # 返回 -> c_info
def check_cloud(): # 方法 云端地址检查
url_list = ['aHR0cHM6Ly9hcGkubW9tb2UubWwv', 'aHR0cHM6Ly9hcGkubGltb2UuZXUub3JnLw==', 'aHR0cHM6Ly9hcGkuaWxpeWEuY2Yv'] # URL list Encode
for i in url_list: # for循环 url_list
url = str(base64.b64decode(i).decode()) # 设置 url地址 [str]
try: # 异常捕捉
requests.get(url=url, verify=False, timeout=10) # HTTP[GET]请求 超时 10秒
except Exception as err: # 异常捕捉
logger.debug(str(err)) # 调试日志输出
continue # 循环继续
else: # 分支判断
info = ['HTTPS', 'Eu_HTTPS', 'CloudFlare'] # 输出信息[List]
logger.info(str(info[url_list.index(i)]) + " Server Check OK\n--------------------\n") # 标准日志输出
return i # 返回 ->i
logger.info("\n云端地址全部失效, 请检查网络!") # 标准日志输出
#ql_send('云端地址失效. 请联系作者或者检查网络.') # 推送消息
return 403
#sys.exit(1) # 脚本退出
def check_port(): # 方法 检查变量传递端口
logger.info("\n--------------------\n") # 标准日志输出
if "QL_PORT" in os.environ: # 判断 系统变量是否存在[QL_PORT]
try: # 异常捕捉
port = int(os.environ['QL_PORT']) # 取值 [int]
except Exception as err: # 异常捕捉
logger.debug(str(err)) # 调试日志输出
logger.info("变量格式有问题...\n格式: export QL_PORT=\"端口号\"") # 标准日志输出
logger.info("使用默认端口5700") # 标准日志输出
return 5700 # 返回端口 5700
else: # 判断分支
port = 5700 # 默认5700端口
if not ql_check(port): # 调用方法 [ql_check] 传递 [port]
logger.info(str(port) + "端口检查失败, 如果改过端口, 请在变量中声明端口 \n在config.sh中加入 export QL_PORT=\"端口号\"") # 标准日志输出
logger.info("\n如果你很确定端口没错, 还是无法执行, 在GitHub给我发issus\n--------------------\n") # 标准日志输出
sys.exit(1) # 脚本退出
else: # 判断分支
logger.info(str(port) + "端口检查通过") # 标准日志输出
return port # 返回->port
if __name__ == '__main__': # Python主函数执行入口
port = check_port() # 调用方法 [check_port] 并赋值 [port]
ql_url = 'http://127.0.0.1:{0}/'.format(port)
token = ql_login() # 调用方法 [ql_login] 并赋值 [token]
s = requests.session() # 设置 request session方法
s.headers.update({"authorization": "Bearer " + str(token)}) # 增加 HTTP头认证
s.headers.update({"Content-Type": "application/json;charset=UTF-8"}) # 增加 HTTP头 json 类型
ql_id = check_id() # 调用方法 [check_id] 并赋值 [ql_id]
url_t = check_cloud() # 调用方法 [check_cloud] 并赋值 [url_t]
flag = ''
if url_t == 403:
logger.info("\n尝试使用nolan接口请求\n")
ua = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36'
flag = 'bak'
else:
cloud_arg = cloud_info() # 调用方法 [cloud_info] 并赋值 [cloud_arg]
update() # 调用方法 [update]
ua = cloud_arg['User-Agent'] # 设置全局变量 UA
wslist = get_wskey() # 调用方法 [get_wskey] 并赋值 [wslist]
envlist = get_env() # 调用方法 [get_env] 并赋值 [envlist]
if "WSKEY_SLEEP" in os.environ and str(os.environ["WSKEY_SLEEP"]).isdigit(): # 判断变量[WSKEY_SLEEP]是否为数字类型
sleepTime = int(os.environ["WSKEY_SLEEP"]) # 获取变量 [int]
else: # 判断分支
sleepTime = 10 # 默认休眠时间 10秒
for ws in wslist: # wslist变量 for循环 [wslist -> ws]
wspin = ws.split(";")[0] # 变量分割 ;
if "pin" in wspin: # 判断 pin 是否存在于 [wspin]
wspin = "pt_" + wspin + ";" # 封闭变量
return_serch = serch_ck(wspin) # 变量 pt_pin 搜索获取 key eid
if return_serch[0]: # bool: True 搜索到账号
jck = str(return_serch[1]) # 拿到 JD_COOKIE
if not check_ck(jck): # bool: False 判定 JD_COOKIE 有效性
tryCount = 1 # 重试次数 1次
if "WSKEY_TRY_COUNT" in os.environ: # 判断 [WSKEY_TRY_COUNT] 是否存在于系统变量
if os.environ["WSKEY_TRY_COUNT"].isdigit(): # 判断 [WSKEY_TRY_COUNT] 是否为数字
tryCount = int(os.environ["WSKEY_TRY_COUNT"]) # 设置 [tryCount] int
for count in range(tryCount): # for循环 [tryCount]
count += 1 # 自增
return_ws = getToken(ws) # 使用 WSKEY 请求获取 JD_COOKIE bool jd_ck
if return_ws[0]: # 判断 [return_ws]返回值 Bool类型
break # 中断循环
if count < tryCount: # 判断循环次
logger.info("{0} 秒后重试,剩余次数:{1}\n".format(sleepTime, tryCount - count)) # 标准日志输出
time.sleep(sleepTime) # 脚本休眠 使用变量 [sleepTime]
if return_ws[0]: # 判断 [return_ws]返回值 Bool类型
nt_key = str(return_ws[1]) # 从 return_ws[1] 取出 -> nt_key
# logger.info("wskey转pt_key成功", nt_key) # 标准日志输出 [未启用]
logger.info("wskey转换成功") # 标准日志输出
eid = return_serch[2] # 从 return_serch 拿到 eid
ql_update(eid, nt_key) # 函数 ql_update 参数 eid JD_COOKIE
else: # 判断分支
if "WSKEY_AUTO_DISABLE" in os.environ: # 从系统变量中获取 WSKEY_AUTO_DISABLE
logger.info(str(wspin) + "账号失效") # 标准日志输出
text = "账号: {0} WsKey疑似失效".format(wspin) # 设置推送内容
else: # 判断分支
eid = return_serch[2] # 读取 return_serch[2] -> eid
logger.info(str(wspin) + "账号禁用") # 标准日志输出
ql_disable(eid) # 执行方法[ql_disable] 传递 eid
text = "账号: {0} WsKey疑似失效, 已禁用Cookie".format(wspin) # 设置推送内容
ql_send(text)
else: # 判断分支
logger.info(str(wspin) + "账号有效") # 标准日志输出
eid = return_serch[2] # 读取 return_serch[2] -> eid
ql_enable(eid) # 执行方法[ql_enable] 传递 eid
logger.info("--------------------\n") # 标准日志输出
else: # 判断分支
logger.info("\n新wskey\n") # 标准日志分支
return_ws = getToken(ws) # 使用 WSKEY 请求获取 JD_COOKIE bool jd_ck
if return_ws[0]: # 判断 (return_ws[0]) 类型: [Bool]
nt_key = str(return_ws[1]) # return_ws[1] -> nt_key
logger.info("wskey转换成功\n") # 标准日志输出
ql_insert(nt_key) # 调用方法 [ql_insert]
logger.info("暂停{0}秒\n".format(sleepTime)) # 标准日志输出
time.sleep(sleepTime) # 脚本休眠
else: # 判断分支
logger.info("WSKEY格式错误\n--------------------\n") # 标准日志输出
logger.info("执行完成\n--------------------") # 标准日志输出
sys.exit(0) # 脚本退出
# Enjoy