This repository has been archived by the owner on Apr 3, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
api.py
99 lines (82 loc) · 3.81 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import os
import hmac
import hashlib
import utils
import orjson
import simdjson
import logging
from aiocache import cached # type: ignore
from flask import Flask, request
from flask_restful import reqparse # type: ignore
from werkzeug.exceptions import HTTPException
app = Flask(__name__)
app.config['JSON_SORT_KEYS'] = False
app.config['JSON_AS_ASCII'] = False
logging.basicConfig(filename='search.log', level=logging.WARNING, format='%(asctime)s %(levelname)s %(name)s: %(message)s')
utils.init_db(os.path.join('manifests'))
jsonparser = simdjson.Parser()
@cached(ttl=1800)
async def return_results_hashable(query: str, threshold: int) -> list[dict]:
with open('database.json', 'rb') as f:
database = jsonparser.parse(f.read()).at_pointer('/bypass_information') # type: ignore
assert isinstance(database, simdjson.Array)
return await utils.return_results(database, query, threshold, utils.generate_list_for_search('database.json'))
@app.errorhandler(HTTPException)
def handle_exception(e):
return f'<center><img src="https://http.cat/{e.code}"></center>', e.code
@app.route('/bypass', methods=["GET"])
async def bypass_list():
with open('database.json', 'rb') as f:
data = orjson.dumps({'status': 'Successful', 'data': jsonparser.parse(f.read()).at_pointer('/bypass_list').as_dict()})
return app.response_class(
response=data,
status=200,
mimetype="application/json; charset=utf-8"
)
@app.route('/app', methods=["GET"])
async def bypass_lookup():
reqparser = reqparse.RequestParser()
reqparser.add_argument('search', required=False)
args = reqparser.parse_args()
if args.search is None:
with open('database.json', 'rb') as f:
data = orjson.dumps({'status': 'Successful', 'data': jsonparser.parse(f.read()).at_pointer('/app_list').as_list()})
else:
search_results = await return_results_hashable(args.search.lower(), 86)
if search_results:
data = orjson.dumps({'status': 'Successful', 'data': search_results})
else:
data = orjson.dumps({'status': 'Not Found'})
app.logger.warning(f"[{request.headers.get('User-Agent')}] Could not find app in database: {args.search}")
return app.response_class(
response=data,
status=200,
mimetype="application/json; charset=utf-8"
)
@app.route('/gh-webhook', methods=["POST"])
async def update_api():
if 'GITHUB_WEBHOOK_SECRET' in os.environ and request.headers.get('X-Hub-Signature-256'):
webhook_secret = os.environ.get('GITHUB_WEBHOOK_SECRET').encode('utf-8')
signature = 'sha256=' + hmac.new(webhook_secret, request.data, hashlib.sha256).hexdigest()
if hmac.compare_digest(signature, request.headers.get('X-Hub-Signature-256')):
content = request.json
if content['ref'] == 'refs/heads/main':
if 'manifests' in content['repository']['full_name']:
os.system('git submodule update --recursive --remote')
utils.init_db(os.path.join('manifests'))
utils.generate_list_for_search.cache_clear()
await return_results_hashable.cache._clear()
return "Rebuilt database", 200
elif 'api' in content['repository']['full_name']:
try:
return "Restarting API", 200
finally:
systemd_service = 'jbdetectapi'
os.system('git pull')
os.system(f'sudo /bin/systemctl restart {systemd_service}')
else:
return "Signatures didn't match!", 500
else:
return "Endpoint not allowed. You're missing GITHUB_WEBHOOK_SECRET, or did not provide a signature.", 403
if __name__ == '__main__':
app.run(debug=True)