This repository has been archived by the owner on May 30, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
160 lines (122 loc) · 4.4 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import collections
import os.path
import sqlite3
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
import flask
from scan import File
from scan import Repo
from util import db_connect
app = flask.Flask(__name__)
def _get_repos(db: sqlite3.Connection) -> List[Repo]:
repos = []
res = db.execute('SELECT * FROM data ORDER BY repo, filename')
name, filename, rev, account_type, star_count, checksum = next(res)
files: Tuple[File, ...] = (File(filename, checksum),)
repo = Repo(name, rev, account_type, star_count, files)
for name, filename, rev, account_type, star_count, checksum in res:
if name == repo.repo:
files = (*repo.files, File(filename, checksum))
repo = repo._replace(files=files)
else:
repos.append(repo)
files = (File(filename, checksum),)
repo = Repo(name, rev, account_type, star_count, files)
repos.append(repo)
repos.sort(key=lambda repo: -repo.star_count)
return repos
def _get_vulnerable_done(
db: sqlite3.Connection,
) -> Tuple[Dict[str, Optional[bool]], Set[str]]:
vulnerable_values: Dict[str, Optional[bool]] = {
k: bool(v) for k, v in db.execute('SELECT * FROM status')
}
vulnerable = collections.defaultdict(lambda: None, vulnerable_values)
done = {repo for repo, in db.execute('SELECT * FROM done')}
return vulnerable, done
@app.route('/', methods=['GET'])
def index() -> str:
with db_connect() as db:
repos = _get_repos(db)
vulnerable, done = _get_vulnerable_done(db)
return flask.render_template(
'index.html',
repos=repos,
vulnerable=vulnerable,
done=done,
)
@app.route('/by-org', methods=['GET'])
def by_org() -> str:
with db_connect() as db:
repos = _get_repos(db)
vulnerable, done = _get_vulnerable_done(db)
by_org_unsorted = collections.defaultdict(list)
for repo in repos:
by_org_unsorted[repo.repo1].append(repo)
by_org = dict(
sorted(
by_org_unsorted.items(),
key=lambda kv: -sum(repo.star_count for repo in kv[1]),
),
)
return flask.render_template(
'by_org.html',
by_org=by_org,
vulnerable=vulnerable,
done=done,
)
@app.route('/repo/<repo1>/<repo2>', methods=['GET'])
def repo(repo1: str, repo2: str) -> str:
repo_s = f'{repo1}/{repo2}'
with db_connect() as db:
query = 'SELECT * FROM data WHERE repo = ? ORDER BY filename'
res = db.execute(query, (repo_s,)).fetchall()
files = tuple(
File(filename, checksum)
for _, filename, _, _, _, checksum in res
)
name, _, rev, account_type, star_count, _ = next(iter(res))
repo = Repo(name, rev, account_type, star_count, files)
vulnerable, done = _get_vulnerable_done(db)
def _readfile(file: File) -> str:
with open(os.path.join('files', repo.repo, repo.rev, file.name)) as f:
return f.read()
contents = {file.name: _readfile(file) for file in repo.files}
return flask.render_template(
'repo.html',
repo=repo,
vulnerable=vulnerable,
done=done,
contents=contents,
)
@app.route('/make-bad/<checksum>', methods=['POST'])
def make_bad(checksum: str) -> Tuple[str, int]:
with db_connect() as db:
query = 'INSERT OR REPLACE INTO status VALUES (?, ?)'
db.execute(query, (checksum, 1))
return '', 204
@app.route('/make-good/<checksum>', methods=['POST'])
def make_good(checksum: str) -> Tuple[str, int]:
with db_connect() as db:
query = 'INSERT OR REPLACE INTO status VALUES (?, ?)'
db.execute(query, (checksum, 0))
return '', 204
@app.route('/clear-status/<checksum>', methods=['POST'])
def clear_status(checksum: str) -> Tuple[str, int]:
with db_connect() as db:
db.execute('DELETE FROM status WHERE checksum = ?', (checksum,))
return '', 204
@app.route('/mark-done/<repo1>/<repo2>', methods=['POST'])
def mark_done(repo1: str, repo2: str) -> Tuple[str, int]:
repo = f'{repo1}/{repo2}'
with db_connect() as db:
db.execute('INSERT OR REPLACE INTO done VALUES (?)', (repo,))
return '', 204
def main() -> int:
app.run(port=8000, threaded=False)
return 0
if __name__ == '__main__':
exit(main())