forked from DeppWang/youdaonote-pull
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pull.py
810 lines (714 loc) · 31.2 KB
/
pull.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import logging
import os
import re
import sys
import time
import traceback
import xml.etree.ElementTree as ET
from enum import Enum
from urllib.parse import urlparse
import requests
__author__ = 'Depp Wang ([email protected])'
__github__ = 'https//github.com/DeppWang/youdaonote-pull'
REGEX_SYMBOL = re.compile(r'[\\/:\*\?"<>\|]') # 符号:\ / : * ? " < > |
REGEX_IMAGE_URL = re.compile(r'!\[.*?\]\((.*?note\.youdao\.com.*?)\)')
REGEX_ATTACH = re.compile(r'\[(.*?)\]\(((http|https)://note\.youdao\.com.*?)\)')
MARKDOWN_SUFFIX = '.md'
NOTE_SUFFIX = '.note'
class FileActionEnum(Enum):
IGNORE = "忽略"
ADD = "新增"
UPDATE = "更新"
class XmlElementConvert(object):
"""
XML Element 转换规则
"""
@staticmethod
def convert_para_func(**kwargs):
# 正常文本
# 粗体、斜体、删除线、链接
return kwargs.get('text')
@staticmethod
def convert_heading_func(**kwargs):
# 标题
level = kwargs.get('element').attrib.get('level', 0)
level = 1 if level in (['a', 'b']) else level
text = kwargs.get('text')
return ' '.join(["#" * int(level), text]) if text else text
@staticmethod
def convert_image_func(**kwargs):
# 图片
image_url = XmlElementConvert.get_text_by_key(list(kwargs.get('element')), 'source')
return '![{text}]({image_url})'.format(text=kwargs.get('text'), image_url=image_url)
@staticmethod
def convert_attach_func(**kwargs):
# 附件
element = kwargs.get('element')
filename = XmlElementConvert.get_text_by_key(list(element), 'filename')
resource_url = XmlElementConvert.get_text_by_key(list(element), 'resource')
return '[{text}]({resource_url})'.format(text=filename, resource_url=resource_url)
@staticmethod
def convert_code_func(**kwargs):
# 代码块
language = XmlElementConvert.get_text_by_key(list(kwargs.get('element')), 'language')
return '```{language}\r\n{code}```'.format(language=language, code=kwargs.get('text'))
@staticmethod
def convert_todo_func(**kwargs):
# to-do
return '- [ ] {text}'.format(text=kwargs.get('text'))
@staticmethod
def convert_quote_func(**kwargs):
# 引用
return '> {text}'.format(text=kwargs.get('text'))
@staticmethod
def convert_horizontal_line_func(**kwargs):
# 分割线
return '---'
@staticmethod
def convert_list_item_func(**kwargs):
# 列表
list_id = kwargs.get('element').attrib['list-id']
is_ordered = kwargs.get('list_item').get(list_id)
text = kwargs.get('text')
if is_ordered == 'unordered':
return '- {text}'.format(text=text)
elif is_ordered == 'ordered':
return '1. {text}'.format(text=text)
@staticmethod
def convert_table_func(**kwargs):
"""
表格转换
:param kwargs:
:return:
"""
element = kwargs.get('element')
content = XmlElementConvert.get_text_by_key(element, 'content')
table_data_str = f'' # f-string 多行字符串
nl = '\r\n' # 考虑 Windows 系统,换行符设为 \r\n
table_data = json.loads(content)
table_data_len = len(table_data['widths'])
table_data_arr = []
table_data_line = []
for cells in table_data['cells']:
cell_value = XmlElementConvert._encode_string_to_md(cells['value'])
table_data_line.append(cell_value)
# 攒齐一行放到 table_data_arr 中,并重置 table_data_line
if len(table_data_line) == table_data_len:
table_data_arr.append(table_data_line)
table_data_line = []
# 如果只有一行,那就给他加一个空白 title 行
if len(table_data_arr) == 1:
table_data_arr.insert(0, [ch for ch in (" " * table_data_len)])
table_data_arr.insert(1, [ch for ch in ("-" * table_data_len)])
elif len(table_data_arr) > 1:
table_data_arr.insert(1, [ch for ch in ("-" * table_data_len)])
for table_line in table_data_arr:
table_data_str += "|"
for table_data in table_line:
table_data_str += f' %s |' % table_data
table_data_str += f'{nl}'
return table_data_str
@staticmethod
def get_text_by_key(element_children, key='text'):
"""
获取文本内容
:return:
"""
for sub_element in element_children:
if key in sub_element.tag:
return sub_element.text if sub_element.text else ''
return ''
@staticmethod
def _encode_string_to_md(original_text):
""" 将字符串转义 防止 markdown 识别错误 """
if len(original_text) <= 0 or original_text == " ":
return original_text
original_text = original_text.replace('\\', '\\\\') # \\ 反斜杠
original_text = original_text.replace('*', '\\*') # \* 星号
original_text = original_text.replace('_', '\\_') # \_ 下划线
original_text = original_text.replace('#', '\\#') # \# 井号
# markdown 中需要转义的字符
original_text = original_text.replace('&', '&')
original_text = original_text.replace('<', '<')
original_text = original_text.replace('>', '>')
original_text = original_text.replace('“', '"')
original_text = original_text.replace('‘', ''')
original_text = original_text.replace('\t', ' ')
# 换行 <br>
original_text = original_text.replace('\r\n', '<br>')
original_text = original_text.replace('\n\r', '<br>')
original_text = original_text.replace('\r', '<br>')
original_text = original_text.replace('\n', '<br>')
return original_text
class YoudaoNoteConvert(object):
"""
有道云笔记 xml 内容转换为 markdown 内容
"""
@staticmethod
def covert_html_to_markdown(file_path):
"""
转换 HTML 为 MarkDown
:param file_path:
:return:
"""
with open(file_path, 'rb') as f:
content_str = f.read().decode('utf-8')
from markdownify import markdownify as md
# 如果换行符丢失,使用 md(content_str.replace('<br>', '<br><br>').replace('</div>', '</div><br><br>')).rstrip()
new_content = md(content_str)
base = os.path.splitext(file_path)[0]
new_file_path = ''.join([base, MARKDOWN_SUFFIX])
os.rename(file_path, new_file_path)
with open(new_file_path, 'wb') as f:
f.write(new_content.encode())
@staticmethod
def covert_xml_to_markdown_content(file_path):
# 使用 xml.etree.ElementTree 将 xml 文件转换为对象
element_tree = ET.parse(file_path)
note_element = element_tree.getroot() # note Element
# list_item 的 id 与 type 的对应
list_item = {}
for child in note_element[0]:
if 'list' in child.tag:
list_item[child.attrib['id']] = child.attrib['type']
body_element = note_element[1] # Element
new_content_list = []
for element in list(body_element):
text = XmlElementConvert.get_text_by_key(list(element))
name = element.tag.replace('{http://note.youdao.com}', '').replace('-', '_')
convert_func = getattr(XmlElementConvert, 'convert_{}_func'.format(name), None)
# 如果没有转换,只保留文字
if not convert_func:
new_content_list.append(text)
continue
line_content = convert_func(text=text, element=element, list_item=list_item)
new_content_list.append(line_content)
return f'\r\n\r\n'.join(new_content_list) # 换行 1 行
@staticmethod
def covert_xml_to_markdown(file_path) -> bool:
"""
转换 XML 为 MarkDown
:param file_path:
:return:
"""
base = os.path.splitext(file_path)[0]
new_file_path = ''.join([base, MARKDOWN_SUFFIX])
# 如果文件为空,结束
if os.path.getsize(file_path) == 0:
os.rename(file_path, new_file_path)
return False
new_content = YoudaoNoteConvert.covert_xml_to_markdown_content(file_path)
os.rename(file_path, new_file_path)
with open(new_file_path, 'wb') as f:
f.write(new_content.encode('utf-8'))
return True
class ImageUpload(object):
"""
图片上传到指定图床
"""
@staticmethod
def upload_to_smms(youdaonote_api, image_url, smms_secret_token) -> (str, str):
"""
上传图片到 sm.ms
:param image_url:
:param smms_secret_token:
:return: url, error_msg
"""
try:
smfile = youdaonote_api.http_get(image_url).content
except:
error_msg = '下载「{}」失败!图片可能已失效,可浏览器登录有道云笔记后,查看图片是否能正常加载'.format(image_url)
return '', error_msg
files = {'smfile': smfile}
upload_api_url = 'https://sm.ms/api/v2/upload'
headers = {'Authorization': smms_secret_token}
error_msg = 'SM.MS 免费版每分钟限额 20 张图片,每小时限额 100 张图片,大小限制 5 M,上传失败!「{}」未转换,' \
'将下载图片到本地'.format(image_url)
try:
res_json = requests.post(upload_api_url, headers=headers, files=files, timeout=5).json()
except requests.exceptions.ProxyError as err:
error_msg = '网络错误,上传「{}」到 SM.MS 失败!将下载图片到本地。错误提示:{}'.format(image_url, format(err))
return '', error_msg
except Exception:
return '', error_msg
if res_json.get('success'):
url = res_json['data']['url']
print('已将图片「{}」转换为「{}」'.format(image_url, url))
return url, ''
if res_json.get('code') == 'image_repeated':
url = res_json['images']
print('已将图片「{}」转换为「{}」'.format(image_url, url))
return url, ''
if res_json.get('code') == 'flood':
return '', error_msg
error_msg = '上传「{}」到 SM.MS 失败,请检查图片 url 或 smms_secret_token({})是否正确!将下载图片到本地'.format(
image_url, smms_secret_token)
return '', error_msg
class YoudaoNoteApi(object):
"""
有道云笔记 API 封装
原理:https://depp.wang/2020/06/11/how-to-find-the-api-of-a-website-eg-note-youdao-com/
"""
ROOT_ID_URL = 'https://note.youdao.com/yws/api/personal/file?method=getByPath&keyfrom=web&cstk={cstk}'
DIR_MES_URL = 'https://note.youdao.com/yws/api/personal/file/{dir_id}?all=true&f=true&len=1000&sort=1' \
'&isReverse=false&method=listPageByParentId&keyfrom=web&cstk={cstk}'
FILE_URL = 'https://note.youdao.com/yws/api/personal/sync?method=download&_system=macos&_systemVersion=&' \
'_screenWidth=1280&_screenHeight=800&_appName=ynote&_appuser=0123456789abcdeffedcba9876543210&' \
'_vendor=official-website&_launch=16&_firstTime=&_deviceId=0123456789abcdef&_platform=web&' \
'_cityCode=110000&_cityName=&sev=j1&keyfrom=web&cstk={cstk}'
def __init__(self, cookies_path=None):
"""
初始化
:param cookies_path:
"""
self.session = requests.session() # 使用 session 维持有道云笔记的登陆状态
self.session.headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/100.0.4896.88 Safari/537.36',
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'sec-ch-ua': '" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"macOS"',
}
self.cookies_path = cookies_path if cookies_path else 'cookies.json'
self.cstk = None
def login_by_cookies(self) -> str:
"""
使用 Cookies 登录,其实就是设置 Session 的 Cookies
:return: error_msg
"""
try:
cookies = self._covert_cookies()
except Exception as err:
return format(err)
for cookie in cookies:
self.session.cookies.set(
name=cookie[0], value=cookie[1], domain=cookie[2], path=cookie[3])
self.cstk = cookies[0][1] if cookies[0][0] == 'YNOTE_CSTK' else None # cstk 用于请求时接口验证
if not self.cstk:
return 'YNOTE_CSTK 字段为空'
print('本次使用 Cookies 登录')
def _covert_cookies(self) -> list:
"""
读取 cookies 文件的 cookies,并转换为字典
:return: cookies
"""
with open(self.cookies_path, 'rb') as f:
json_str = f.read().decode('utf-8')
try:
cookies_dict = json.loads(json_str) # 将字符串转换为字典
cookies = cookies_dict['cookies']
except Exception:
raise Exception('转换「{}」为字典时出现错误'.format(self.cookies_path))
return cookies
def http_post(self, url, data=None, files=None):
"""
封装 post 请求
:param url:
:param data:
:param files:
:return: response
"""
return self.session.post(url, data=data, files=files)
def http_get(self, url):
"""
封装 get 请求
:param url:
:return: response
"""
return self.session.get(url)
def get_root_dir_info_id(self) -> dict:
"""
获取有道云笔记根目录信息
:return: {
'fileEntry': {'id': 'test_root_id', 'name': 'ROOT', ...},
...
}
"""
data = {'path': '/', 'entire': 'true', 'purge': 'false', 'cstk': self.cstk}
return self.http_post(self.ROOT_ID_URL.format(cstk=self.cstk), data=data).json()
def get_dir_info_by_id(self, dir_id) -> dict:
"""
根据目录 ID 获取目录下所有文件信息
:return: {
'count': 3,
'entries': [
{'fileEntry': {'id': 'test_dir_id', 'name': 'test_dir', 'dir': true, ...}},
{'fileEntry': {'id': 'test_note_id', 'name': 'test_note', 'dir': false, ...}}
...
]
}
"""
url = self.DIR_MES_URL.format(dir_id=dir_id, cstk=self.cstk)
return self.http_get(url).json()
def get_file_by_id(self, file_id):
"""
根据文件 ID 获取文件内容
:param file_id:
:return: response,内容为笔记字节码
"""
data = {'fileId': file_id, 'version': -1,
'convert': 'true', 'editorType': 1, 'cstk': self.cstk}
url = self.FILE_URL.format(cstk=self.cstk)
return self.http_post(url, data=data)
class YoudaoNotePull(object):
"""
有道云笔记 Pull 封装
"""
CONFIG_PATH = 'config.json'
def __init__(self):
# 本地文件根目录
self.root_local_dir = None
self.youdaonote_api = None
self.smms_secret_token = None
# 导出时需要排除的文件夹
self.exclude_dirs = None
# 当前正在处理的文件的本地路径
self.current_note_path = None
def get_ydnote_dir_id(self):
"""
获取有道云笔记根目录或指定目录 ID
:return:
"""
config_dict, error_msg = self._covert_config()
if error_msg:
return '', error_msg
local_dir, error_msg = self._check_local_dir(local_dir=config_dict['local_dir'])
if error_msg:
return '', error_msg
self.root_local_dir = local_dir
self.exclude_dirs = config_dict['exclude_dirs']
self.youdaonote_api = YoudaoNoteApi()
error_msg = self.youdaonote_api.login_by_cookies()
if error_msg:
return '', error_msg
self.smms_secret_token = config_dict['smms_secret_token']
return self._get_ydnote_dir_id(ydnote_dir=config_dict['ydnote_dir'])
def pull_dir_by_id_recursively(self, dir_id, local_dir):
"""
根据目录 ID 循环遍历下载目录下所有文件
:param dir_id:
:param local_dir: 本地目录
:return: error_msg
"""
dir_info = self.youdaonote_api.get_dir_info_by_id(dir_id)
try:
entries = dir_info['entries']
except KeyError:
raise KeyError('有道云笔记修改了接口地址,此脚本暂时不能使用!请提 issue')
for entry in entries:
file_entry = entry['fileEntry']
id = file_entry['id']
name = file_entry['name']
if file_entry['dir']:
sub_dir = os.path.join(local_dir, name).replace('\\', '/')
# 排除不需要到处的文件夹(一级目录)
is_excluded = False
for item in self.exclude_dirs:
if name.find(item) != -1:
print("文件夹 [%s] 已排除,不需要导出。" % name)
is_excluded = True
break
if is_excluded:
continue
if not os.path.exists(sub_dir):
os.mkdir(sub_dir)
self.pull_dir_by_id_recursively(id, sub_dir)
else:
modify_time = file_entry['modifyTimeForSort']
self._add_or_update_file(id, name, local_dir, modify_time)
def _covert_config(self, config_path=None) -> (dict, str):
"""
转换配置文件为 dict
:param config_path: config 文件路径
:return: (config_dict, error_msg)
"""
config_path = config_path if config_path else self.CONFIG_PATH
with open(config_path, 'rb') as f:
config_str = f.read().decode('utf-8')
try:
config_dict = json.loads(config_str)
except:
return {}, '请检查「config.json」格式是否为 utf-8 格式的 json!建议使用 Sublime 编辑「config.json」'
key_list = ['local_dir', 'ydnote_dir', 'exclude_dirs', 'smms_secret_token']
for item in key_list:
if item not in config_dict.keys():
return {}, '请检查「config.json」的 key 是否分别为 local_dir, ydnote_dir, smms_secret_token, exclude_dirs'
return config_dict, ''
def _check_local_dir(self, local_dir, test_default_dir=None) -> (str, str):
"""
检查本地文件夹
:param local_dir: 本地文件夹名(绝对路径)
:return: local_dir, error_msg
"""
# 如果没有指定本地文件夹,当前目录新增 youdaonote 目录
if not local_dir:
add_dir = test_default_dir if test_default_dir else 'youdaonote'
# 兼容 Windows 系统,将路径分隔符(\\)替换为 /
local_dir = os.path.join(os.getcwd(), add_dir).replace('\\', '/')
# 如果指定的本地文件夹不存在,创建文件夹
if not os.path.exists(local_dir):
try:
os.mkdir(local_dir)
except:
return '', '请检查「{}」上层文件夹是否存在,并使用绝对路径!'.format(local_dir)
return local_dir, ''
def _get_ydnote_dir_id(self, ydnote_dir) -> (str, str):
"""
获取指定有道云笔记指定目录 ID
:param ydnote_dir: 指定有道云笔记指定目录
:return: dir_id, error_msg
"""
root_dir_info = self.youdaonote_api.get_root_dir_info_id()
root_dir_id = root_dir_info['fileEntry']['id']
# 如果不指定文件夹,取根目录 ID
if not ydnote_dir:
return root_dir_id, ''
dir_info = self.youdaonote_api.get_dir_info_by_id(root_dir_id)
for entry in dir_info['entries']:
file_entry = entry['fileEntry']
if file_entry['name'] == ydnote_dir:
return file_entry['id'], ''
return '', '有道云笔记指定顶层目录不存在'
def _add_or_update_file(self, file_id, file_name, local_dir, modify_time):
"""
新增或更新文件
:param file_id:
:param file_name:
:param local_dir:
:param modify_time:
:return:
"""
file_name = self._optimize_file_name(file_name)
youdao_file_suffix = os.path.splitext(file_name)[1] # 笔记后缀
original_file_path = os.path.join(local_dir, file_name).replace('\\', '/') # 原后缀路径
is_note = self._judge_is_note(file_id, youdao_file_suffix)
# 「note」类型本地文件均已 .md 结尾
local_file_path = os.path.join(local_dir, ''.join([os.path.splitext(file_name)[0], MARKDOWN_SUFFIX])).replace(
'\\', '/') if is_note else original_file_path
# 如果有有道云笔记是「note」类型,则提示类型
tip = ',云笔记原格式为 note' if is_note else ''
file_action = self._get_file_action(local_file_path, modify_time)
if file_action == FileActionEnum.IGNORE:
return
if file_action == FileActionEnum.UPDATE:
# 考虑到使用 f.write() 直接覆盖原文件,在 Windows 下报错(WinError 183),先将其删除
os.remove(local_file_path)
try:
self.current_note_path = local_file_path
self._pull_file(file_id, original_file_path,
local_file_path, is_note, youdao_file_suffix)
print('{}「{}」{}'.format(file_action.value, local_file_path, tip))
except Exception as error:
print('{}「{}」失败!请检查文件!错误提示:{}'.format(file_action.value, original_file_path, format(error)))
def _judge_is_note(self, file_id, youdao_file_suffix):
"""
判断是否是 note 类型
:param file_id:
:param youdao_file_suffix:
:return:
"""
is_note = False
# 1、如果文件是 .note 类型
if youdao_file_suffix == NOTE_SUFFIX:
is_note = True
# 2、如果文件没有类型后缀,但以 `<?xml` 开头
if not youdao_file_suffix:
response = self.youdaonote_api.get_file_by_id(file_id)
content = response.content[:5]
is_note = True if content == b"<?xml" else False
return is_note
def _pull_file(self, file_id, file_path, local_file_path, is_note, youdao_file_suffix):
"""
下载文件
:param file_id:
:param file_path:
:param local_file_path: 本地
:param is_note:
:param youdao_file_suffix:
:return:
"""
# 1、所有的都先下载
response = self.youdaonote_api.get_file_by_id(file_id)
with open(file_path, 'wb') as f:
f.write(response.content) # response.content 本身就是字节类型
# 2、如果文件是 note 类型,将其转换为 MarkDown 类型
if is_note:
try:
YoudaoNoteConvert.covert_xml_to_markdown(file_path)
except ET.ParseError:
print('此 note 笔记应该为 17 年以前新建,格式为 html,将转换为 Markdown ...')
YoudaoNoteConvert.covert_html_to_markdown(file_path)
except Exception:
print('note 笔记转换 MarkDown 失败,将跳过')
# 3、迁移文本文件里面的有道云笔记链接
if is_note or youdao_file_suffix == MARKDOWN_SUFFIX:
self._migration_ydnote_url(local_file_path)
def _get_file_action(self, local_file_path, modify_time) -> Enum:
"""
获取文件操作行为
:param local_file_path:
:param modify_time:
:return: FileActionEnum
"""
# 如果不存在,则下载
if not os.path.exists(local_file_path):
return FileActionEnum.ADD
# 如果已经存在,判断是否需要更新
# 如果有道云笔记文件更新时间小于本地文件时间,说明没有更新,则不下载,跳过
if modify_time < os.path.getmtime(local_file_path):
logging.info('此文件「%s」不更新,忽略', local_file_path)
return FileActionEnum.IGNORE
# 同一目录存在同名 md 和 note 文件时,后更新文件将覆盖另一个
return FileActionEnum.UPDATE
def _optimize_file_name(self, name) -> str:
"""
优化文件名,替换特殊符号为下划线
:param name:
:return:
"""
name = REGEX_SYMBOL.sub('_', name)
return name
def _migration_ydnote_url(self, file_path):
"""
迁移有道云笔记文件 URL
:param file_path:
:return:
"""
with open(file_path, 'rb') as f:
content = f.read().decode('utf-8')
# 图片
image_urls = REGEX_IMAGE_URL.findall(content)
if len(image_urls) > 0:
print('正在转换有道云笔记「{}」中的有道云图片链接...'.format(file_path))
for image_url in image_urls:
image_path = self._get_new_image_path(image_url)
if image_url == image_path:
continue
content = content.replace(image_url, image_path)
# 附件
attach_name_and_url_list = REGEX_ATTACH.findall(content)
if len(attach_name_and_url_list) > 0:
print('正在转换有道云笔记「{}」中的有道云附件链接...'.format(file_path))
for attach_name_and_url in attach_name_and_url_list:
attach_url = attach_name_and_url[1]
attach_path = self._download_ydnote_url(attach_url, attach_name_and_url[0])
if not attach_path:
continue
content = content.replace(attach_url, attach_path)
with open(file_path, 'wb') as f:
f.write(content.encode())
return
def _get_new_image_path(self, image_url) -> str:
"""
将图片链接转换为新的链接
:param image_url:
:return: new_image_path
"""
# 当 smms_secret_token 为空(不上传到 SM.MS),下载到图片到本地
if not self.smms_secret_token:
image_path = self._download_ydnote_url(image_url)
return image_path or image_url
# smms_secret_token 不为空,上传到 SM.MS
new_file_url, error_msg = ImageUpload.upload_to_smms(youdaonote_api=self.youdaonote_api, image_url=image_url,
smms_secret_token=self.smms_secret_token)
# 如果上传失败,仍下载到本地
if not error_msg:
return new_file_url
print(error_msg)
image_path = self._download_ydnote_url(image_url)
return image_path or image_url
def _download_ydnote_url(self, url, attach_name=None) -> str:
"""
下载文件到本地,返回本地路径
:param url:
:param attach_name:
:return: path
"""
try:
response = self.youdaonote_api.http_get(url)
except requests.exceptions.ProxyError as err:
error_msg = '网络错误,「{}」下载失败。错误提示:{}'.format(url, format(err))
print(error_msg)
return ''
content_type = response.headers.get('Content-Type')
file_type = '附件' if attach_name else '图片'
if response.status_code != 200 or not content_type:
error_msg = '下载「{}」失败!{}可能已失效,可浏览器登录有道云笔记后,查看{}是否能正常加载'.format(url, file_type,
file_type)
print(error_msg)
return ''
if attach_name:
# 默认下载附件到 attachments 文件夹
file_dirname = 'attachments'
file_suffix = attach_name
else:
# 默认下载图片到 images 文件夹
file_dirname = 'images'
# 后缀 png 和 jpeg 后可能出现 ; `**.png;`, 原因未知
content_type_arr = content_type.split('/')
file_suffix = '.' + \
content_type_arr[1].replace(';', '') if len(content_type_arr) == 2 else "jpg"
local_file_dir = os.path.join(self.root_local_dir, file_dirname).replace('\\', '/')
if not os.path.exists(local_file_dir):
os.mkdir(local_file_dir)
file_basename = os.path.basename(urlparse(url).path)
file_name = ''.join([file_basename, file_suffix])
local_file_path = os.path.join(local_file_dir, file_name).replace('\\', '/')
try:
with open(local_file_path, 'wb') as f:
f.write(response.content) # response.content 本身就为字节类型
print('已将{}「{}」转换为「{}」'.format(file_type, url, local_file_path))
except:
error_msg = '{} {}有误!'.format(url, file_type)
print(error_msg)
return ''
relative_file_path = self._set_relative_file_path(
self.current_note_path, file_name, local_file_dir)
return relative_file_path
def _set_relative_file_path(self, file_path, file_name, local_file_dir) -> str:
"""
图片/附件设置为相对地址
:param file_path:
:param file_name:
:param local_file_dir:
:return:
"""
note_file_dir = os.path.dirname(file_path)
rel_file_dir = os.path.relpath(local_file_dir, note_file_dir)
rel_file_path = os.path.join(rel_file_dir, file_name)
new_file_path = rel_file_path.replace('\\', '/')
return new_file_path
if __name__ == '__main__':
start_time = int(time.time())
try:
youdaonote_pull = YoudaoNotePull()
ydnote_dir_id, error_msg = youdaonote_pull.get_ydnote_dir_id()
if error_msg:
print(error_msg)
sys.exit(1)
print('正在 pull,请稍后 ...')
youdaonote_pull.pull_dir_by_id_recursively(ydnote_dir_id, youdaonote_pull.root_local_dir)
except requests.exceptions.ProxyError as proxyErr:
print('请检查网络代理设置;也有可能是调用有道云笔记接口次数达到限制,请等待一段时间后重新运行脚本,若一直失败,可删除「cookies.json」后重试')
traceback.print_exc()
print('已终止执行')
sys.exit(1)
except requests.exceptions.ConnectionError as connectionErr:
print('网络错误,请检查网络是否正常连接。若突然执行中断,可忽略此错误,重新运行脚本')
traceback.print_exc()
print('已终止执行')
sys.exit(1)
# 链接错误等异常
except Exception as err:
print('其他错误:', format(err))
traceback.print_exc()
print('已终止执行')
sys.exit(1)
end_time = int(time.time())
print('运行完成!耗时 {} 秒'.format(str(end_time - start_time)))