-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcommon.py
133 lines (118 loc) · 3.5 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""
公用函数
"""
import chardet
import hashlib
import re
from loguru import logger
from urllib import parse
def get_md5(content):
"""
生成MD5
:param content: 任意数据
:return 数据的md5值
"""
md5 = hashlib.md5()
md5.update(content)
return md5.hexdigest()
def url_parsing_init(url):
"""
url解析初始化
"""
if not url:
return False, False, False, False
try:
protocol, rest = parse.splittype(url)
if protocol is None:
protocol = 'http'
rest = '//' + rest
host, rest = parse.splithost(rest)
host, port = parse.splitport(host)
if port is None:
if protocol == 'https':
port = '443'
else:
port = '80'
except Exception as err:
logger.warning('Input url is illegal: {}, error:'
' {}'.format(url, err))
return False, False, False, False
return protocol, host, port, rest
def standard_url(url, with_rest=True):
"""
标准化url数据
:param url: 需要标准化的url
:type url: str
:param with_rest: 是否带上子页面
:type with_rest: bool
:return:
"""
protocol, host, port, rest = url_parsing_init(url)
if not all([protocol, host, port]):
return False
rest = rest.strip('/')
if with_rest and rest:
ret = ''.join([protocol, '://', host, ':', port, '/', rest, '/'])
else:
ret = ''.join([protocol, '://', host, ':', port, '/'])
return ret
def get_host_from_url(url, with_port=False, with_type=False):
"""
获取除去协议和端口的父域名
:param url: 待解析url
:type url: str
:param with_port: 返回时是否加上端口
:type with_port: bool
:param with_type: 返回时是否加上protocol
:type with_type: bool
:return:
"""
try:
protocol, res = parse.splittype(url)
host, res = parse.splithost(res)
if not host:
host = res
domain, port = parse.splitport(host)
if with_type:
domain = ''.join([protocol, '://', domain])
if with_port:
domain = ''.join([domain, ':', port])
return domain
except Exception as err:
logger.warning('Get host failed, url: {}, error: {}'.format(url, err))
def guess_and_decode(content):
"""
猜测并解码
:param content:
:return:
"""
encoding = chardet.detect(content)['encoding']
if encoding == 'GB2312':
# 扩大编码范围,防止一些特殊符号导致无法解码
encoding = 'GB18030'
try:
content = content.decode(encoding)
except AttributeError:
logger.info('The content is already str type, not need to decode.')
return content
except Exception as err:
logger.warning('Decoding failed, now use utf-8,'
' the error: %s' % err)
try:
content = content.decode('utf-8')
except UnicodeDecodeError:
logger.warning('Cannot decode the page!')
return ''
return content
def is_ip(ip_str):
p = re.compile('^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[01]?\d\d?)$')
if p.match(ip_str):
return True
else:
return False
def is_url(url_str):
url_regex = re.compile(r"((https?):((//)|(\\\\))+([\w\d:#@%/;$()~_?\+-=\\\.&](#!)?)*)")
if url_regex.match(url_str) and url_str != "http://" and url_str != "https://":
return True
else:
return False