-
Notifications
You must be signed in to change notification settings - Fork 2
/
npm.py
97 lines (79 loc) · 2.91 KB
/
npm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import json
from time import sleep
import requests
import os
import tempfile
import shutil
from flask import render_template
NPM_ADDRESS = 'https://www.npmjs.com/'
PROXIES = {
'http': 'http://127.0.0.1:8080',
'https': 'http://127.0.0.1:8080',
}
def parse_package(file):
return json.load(file)
def extract_packages(file):
parsed = json.load(file)
dependencies = parsed['dependencies']
return dependencies
def check_package_exists(package_name):
# , proxies=PROXIES, verify=False)
response = requests.get(NPM_ADDRESS + "package/" +
package_name, allow_redirects=False)
return (response.status_code == 200)
def is_scoped(package_name):
split_package_name = package_name.split('/')
return (len(split_package_name) > 1)
def check_scope_exists(package_name):
split_package_name = package_name.split('/')
scope_name = split_package_name[0][1:]
# , proxies=PROXIES, verify=False)
response = requests.get(
NPM_ADDRESS + "~" + scope_name, allow_redirects=False)
return (response.status_code == 200)
def is_vulnerable(package_name):
if (not check_package_exists(package_name)):
if(is_scoped(package_name)):
if (not check_scope_exists(package_name)):
return True
else:
return True
return False
def get_vulnerable_packages(packages):
for package in packages:
sleep(1) # prevent npm rate limit ban
if is_vulnerable(package):
yield package
def upload_package_by_npm(path):
oldcwd = os.getcwd()
os.chdir(path)
#os.system('npm pack --pack-destination=' + oldcwd)
os.system('npm publish')
os.chdir(oldcwd)
def remove_package_by_npm(path):
oldcwd = os.getcwd()
os.chdir(path)
os.system('npm unpublish -f')
os.chdir(oldcwd)
def generate_package(project_id, package, publish):
with tempfile.TemporaryDirectory() as poc_dir:
shutil.copy('payload_package/index.js', poc_dir)
shutil.copy('payload_package/extract.js', poc_dir)
packagejson_string = render_template("package.json", package_name=package.name, package_version=prepare_version_number(package.version), project_id=project_id)
with open(poc_dir + "/package.json", "w") as packagejson_file:
packagejson_file.write(packagejson_string)
if publish:
upload_package_by_npm(poc_dir)
else:
remove_package_by_npm(poc_dir)
def prepare_version_number(version: str):
if version[0].isnumeric():
return version
elif version[0] == '^':
split_semver = version[1:].split('.')
return "{}.{}.{}".format(split_semver[0], int(split_semver[1])+1, split_semver[2])
elif version[0] == '~':
split_semver = version[1:].split('.')
return "{}.{}.{}".format(split_semver[0], split_semver[1], int(split_semver[2])+1)
else:
raise "Broken version number"