forked from kovidgoyal/calibre
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathport.py
251 lines (210 loc) · 7.86 KB
/
port.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
#!/usr/bin/env python
# vim:fileencoding=utf-8
# License: GPL v3 Copyright: 2019, Kovid Goyal <kovid at kovidgoyal.net>
from __future__ import absolute_import, division, print_function, unicode_literals
import errno
import hashlib
import json
import os
import re
import subprocess
import sys
from contextlib import contextmanager
from setup import Command, build_cache_dir, dump_json
@contextmanager
def modified_file(path, modify):
with open(path, 'r+b') as f:
raw = f.read()
nraw = modify(raw)
modified = nraw != raw
if modified:
f.seek(0), f.truncate(), f.write(nraw), f.flush()
f.seek(0)
try:
yield
finally:
if modified:
f.seek(0), f.truncate(), f.write(raw)
def no2to3(raw):
return re.sub(br'^.+?\s+# no2to3$', b'', raw, flags=re.M)
def run_2to3(path, show_diffs=False):
from lib2to3.main import main
with modified_file(path, no2to3):
cmd = [
'-f', 'all',
'-f', 'buffer',
'-f', 'idioms',
'-f', 'set_literal',
'-x', 'future',
path,
]
if not show_diffs:
cmd.append('--no-diffs')
ret = main('lib2to3.fixes', cmd + [path])
return ret
class Base(Command):
scan_all_files = False
EXCLUDED_BASENAMES = {'Zeroconf.py', 'smtplib.py'}
@property
def cache_file(self):
return self.j(build_cache_dir(), self.CACHE)
def is_cache_valid(self, f, cache):
return cache.get(f) == self.file_hash(f)
def save_cache(self, cache):
dump_json(cache, self.cache_file)
def get_files(self):
from calibre import walk
for path in walk(os.path.join(self.SRC, 'calibre')):
if (path.endswith('.py') and not path.endswith('_ui.py') and not
os.path.basename(path) in self.EXCLUDED_BASENAMES):
yield path
def file_hash(self, f):
try:
return self.fhash_cache[f]
except KeyError:
self.fhash_cache[f] = ans = hashlib.sha1(open(f, 'rb').read()).hexdigest()
return ans
def run(self, opts):
self.fhash_cache = {}
cache = {}
try:
cache = json.load(open(self.cache_file, 'rb'))
except EnvironmentError as err:
if err.errno != errno.ENOENT:
raise
dirty_files = tuple(f for f in self.get_files() if not self.is_cache_valid(f, cache))
try:
if self.scan_all_files:
bad_files = []
for f in dirty_files:
if self.file_has_errors(f):
bad_files.append(f)
else:
cache[f] = self.file_hash(f)
dirty_files = bad_files
for i, f in enumerate(dirty_files):
num_left = len(dirty_files) - i - 1
self.info('\tChecking', f)
if self.file_has_errors(f):
self.report_file_error(f, num_left)
self.fhash_cache.pop(f, None)
cache[f] = self.file_hash(f)
finally:
self.save_cache(cache)
def clean(self):
try:
os.remove(self.cache_file)
except EnvironmentError as err:
if err.errno != errno.ENOENT:
raise
class To3(Base):
description = 'Run 2to3 and fix anything it reports'
CACHE = 'check2to3.json'
def report_file_error(self, f, num_left):
run_2to3(f, show_diffs=True)
self.info('%d files left to check' % num_left)
raise SystemExit(1)
def file_has_errors(self, f):
from polyglot.io import PolyglotStringIO
oo, oe = sys.stdout, sys.stderr
sys.stdout = sys.stderr = buf = PolyglotStringIO()
try:
ret = run_2to3(f)
finally:
sys.stdout, sys.stderr = oo, oe
if ret:
raise SystemExit('Could not parse: ' + f)
output = buf.getvalue()
return re.search(r'^RefactoringTool: No changes to ' + f, output, flags=re.M) is None
def edit_file(f):
subprocess.Popen([
'vim', '-S', os.path.join(Command.SRC, '../session.vim'), '-f', f
]).wait()
class UnicodeCheck(Base):
description = 'Check for unicode porting status'
CACHE = 'check_unicode.json'
scan_all_files = True
def get_error_statement(self, f):
uni_pat = re.compile(r'from __future__ import .*\bunicode_literals\b')
str_pat = re.compile(r'\bstr\(')
has_unicode_literals = False
has_str_calls = False
num_lines = 0
for i, line in enumerate(open(f, 'rb')):
line = line.decode('utf-8')
if not line.strip():
continue
num_lines += 1
if not has_unicode_literals and uni_pat.match(line) is not None:
has_unicode_literals = True
if not has_str_calls and str_pat.search(line) is not None:
has_str_calls = True
if has_unicode_literals and has_str_calls:
break
if num_lines < 1:
return
ans = None
if not has_unicode_literals:
if has_str_calls:
ans = 'The file %s does not use unicode literals and has str() calls'
else:
ans = 'The file %s does not use unicode literals'
elif has_str_calls:
ans = 'The file %s has str() calls'
return ans % f if ans else None
def file_has_errors(self, f):
return self.get_error_statement(f) is not None
def report_file_error(self, f, num_left):
edit_file(f)
self.info('%d files left to check' % num_left)
if self.file_has_errors(f):
raise SystemExit(self.get_error_statement(f))
def has_import(text, module, name):
pat = re.compile(r'^from\s+{}\s+import\s+.*\b{}\b'.format(module, name), re.MULTILINE)
if pat.search(text) is not None:
return True
pat = re.compile(r'^from\s+{}\s+import\s+\([^)]*\b{}\b'.format(module, name), re.MULTILINE | re.DOTALL)
if pat.search(text) is not None:
return True
return False
class IteratorsCheck(Base):
description = 'Check for builtins changed to return iterators porting status'
CACHE = 'check_iterators.json'
def get_errors_in_file(self, f):
pat = re.compile(r'\b(range|map|filter|zip)\(')
with open(f, 'rb') as f:
text = f.read().decode('utf-8')
matches = tuple(pat.finditer(text))
if not matches:
return []
ans = []
names = {m.group(1) for m in matches}
imported_names = {n for n in names if has_import(text, 'polyglot.builtins', n)}
safe_funcs = 'list|tuple|set|frozenset|join'
func_pat = r'({})\('.format(safe_funcs)
for_pat = re.compile(r'\bfor\s+.+?\s+\bin\b')
for i, line in enumerate(text.splitlines()):
m = pat.search(line)
if m is not None:
itname = m.group(1)
if itname in imported_names:
continue
start = m.start()
if start > 0:
if line[start-1] == '*':
continue
if line[start-1] == '(':
if re.search(func_pat + itname, line) is not None:
continue
fm = for_pat.search(line)
if fm is not None and fm.start() < start:
continue
ans.append('%s:%s' % (i, itname))
return ans
def file_has_errors(self, f):
return bool(self.get_errors_in_file(f))
def report_file_error(self, f, num_left):
edit_file(f)
self.info('%d files left to check' % num_left)
if self.file_has_errors(f):
raise SystemExit('\n'.join(self.get_errors_in_file(f)))