forked from bingsanyu/feishu_minutes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
feishu_uploader.py
145 lines (129 loc) · 6.66 KB
/
feishu_uploader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import base64, configparser, time, uuid, zlib
from concurrent.futures import as_completed, ThreadPoolExecutor
import requests
from tqdm import tqdm
# 读取配置文件
config = configparser.ConfigParser(interpolation=None)
config.read('config.ini', encoding='utf-8')
# 获取cookie
minutes_cookie = config.get('Cookies', 'minutes_cookie')
# 获取文件路径
file_path = config.get('上传设置', '要上传的文件所在路径(目前仅支持单个文件)')
# 获取代理设置
use_proxy = config.get('代理设置', '是否使用代理(是/否)')
proxy_address = config.get('代理设置', '代理地址')
if use_proxy == '是':
proxies = {
'http': proxy_address,
'https': proxy_address,
}
else:
proxies = None
class FeishuUploader:
def __init__(self, file_path, cookie):
self.file_path = file_path
self.block_size = 2**20*4
self.headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
'cookie': cookie,
'bv-csrf-token': cookie[cookie.find('bv_csrf_token=') + len('bv_csrf_token='):cookie.find(';', cookie.find('bv_csrf_token='))],
'referer': 'https://minutes.feishu.cn/minutes/home'
}
if len(self.headers.get('bv-csrf-token')) != 36:
raise Exception("cookie中不包含bv_csrf_token,请确保从请求`list?size=20&`中获取!")
self.upload_token = None
self.vhid = None
self.upload_id = None
self.object_token = None
with open(self.file_path, 'rb') as f:
self.file_size = f.seek(0, 2)
f.seek(0)
self.file_header = base64.b64encode(f.read(512)).decode()
def get_quota(self):
file_info = f'{uuid.uuid1()}_{self.file_size}'
quota_url = f'https://meetings.feishu.cn/minutes/api/quota?file_info[]={file_info}&language=zh_cn'
quota_res = requests.get(quota_url, headers=self.headers, proxies=proxies).json()
if quota_res['data']['has_quota'] == False:
raise Exception("飞书妙记空间已满,请清理后重试!")
self.upload_token = quota_res['data']['upload_token'][file_info]
# 分片上传文件(预上传)
# doc: https://open.feishu.cn/document/server-docs/docs/drive-v1/upload/multipart-upload-file-/upload_prepare
def prepare_upload(self):
file_name = self.file_path.split("\\")[-1]
# 如果文件名中包含后缀,需要去掉后缀
if '.' in file_name:
file_name = file_name[:file_name.rfind('.')]
prepare_url = f'https://meetings.feishu.cn/minutes/api/upload/prepare'
json = {
'name': file_name,
'file_size': self.file_size,
'file_header': self.file_header,
'drive_upload': True,
'upload_token': self.upload_token,
}
prepare_res = requests.post(prepare_url, headers=self.headers, proxies=proxies, json=json).json()
self.vhid = prepare_res['data']['vhid']
self.upload_id = prepare_res['data']['upload_id']
self.object_token = prepare_res['data']['object_token']
# 分片上传文件(上传分片)
# doc: https://open.feishu.cn/document/server-docs/docs/drive-v1/upload/multipart-upload-file-/upload_part
def upload_blocks(self):
with open(self.file_path, 'rb') as f:
f.seek(0)
block_count = (self.file_size + self.block_size - 1) // self.block_size
with ThreadPoolExecutor(max_workers=6) as executor:
completed_threads = []
with tqdm(total=block_count, unit='block') as progress_bar:
for i in range(block_count):
block_data = f.read(self.block_size)
block_size = len(block_data)
checksum = zlib.adler32(block_data) & 0xffffffff
upload_url = f'https://internal-api-space.feishu.cn/space/api/box/stream/upload/block?upload_id={self.upload_id}&seq={i}&size={block_size}&checksum={checksum}'
thread = executor.submit(requests.post, upload_url, headers=self.headers, proxies=proxies, data=block_data)
completed_threads.append(thread)
for thread in as_completed(completed_threads):
progress_bar.update(1)
# 分片上传文件(完成上传)
# doc: https://open.feishu.cn/document/server-docs/docs/drive-v1/upload/multipart-upload-file-/upload_finish
def complete_upload(self):
complete_url1 = f'https://internal-api-space.feishu.cn/space/api/box/upload/finish/'
json = {
'upload_id': self.upload_id,
'num_blocks': (self.file_size + self.block_size - 1) // self.block_size,
'vhid': self.vhid,
'risk_detection_extra' : '{\"source_terminal\":1,\"file_operate_usage\":3,\"locale\":\"zh_cn\"}'
}
resp = requests.post(complete_url1, headers=self.headers, proxies=proxies, json=json).json()
print(resp)
complete_url2 = f'https://meetings.feishu.cn/minutes/api/upload/finish'
json = {
'auto_transcribe': True,
'language': 'mixed',
'num_blocks': (self.file_size + self.block_size - 1) // self.block_size,
'upload_id': self.upload_id,
'vhid': self.vhid,
'upload_token': self.upload_token,
'object_token': self.object_token,
}
resp = requests.post(complete_url2, headers=self.headers, proxies=proxies, json=json).json()
print(resp)
# 上传完成后检查是否转写完成
start_time = time.time()
while True:
time.sleep(3)
object_status_url = f'https://meetings.feishu.cn/minutes/api/batch-status?object_token[]={self.object_token}&language=zh_cn'
object_status = requests.get(object_status_url, headers=self.headers, proxies=proxies).json()
transcript_progress = object_status['data']['status'][0]['transcript_progress']
spend_time = time.time() - start_time
if object_status['data']['status'][0]['object_status'] == 2 or transcript_progress['current'] == '':
print(f"\n转写完成!用时{spend_time}\nhttp://meetings.feishu.cn/minutes/{object_status['data']['status'][0]['object_token']}")
break
print(f"转写中...已用时{spend_time}\r", end='')
def upload(self):
self.get_quota()
self.prepare_upload()
self.upload_blocks()
self.complete_upload()
if __name__ == '__main__':
uploader = FeishuUploader(file_path, minutes_cookie)
uploader.upload()