-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcheck_crtsh
191 lines (161 loc) · 6.7 KB
/
check_crtsh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#!/usr/bin/env python3
#-------------------------------------------------------------------------------
#https://crt.sh/?q=*.google.com&output=json
#and check that issuer+name and issuer_ca_id in list
#wget "https://crt.sh/?q=google.com&output=json" -O - -q | jq
import urllib.error
import urllib.request
import argparse
import sys
import os
import json
import datetime
import dateutil.parser
import time
import socket
#-------------------------------------------------------------------------------
_candebug = False
def candebug():
global _candebug
return _candebug
def setcandebug(value):
global _candebug
_candebug = value
def infomsg(msg):
if candebug() == True:
print(msg, flush=True)
def exitnagios(status,message):
if status=="OK":
exitcode = 0
elif status=="WARNING":
exitcode = 1
elif status=="CRITICAL":
exitcode = 2
elif status=="UNKNOWN":
exitcode = 3
else:
exitcode = 4
print(status+": "+message, flush=True)
sys.exit(exitcode)
#-------------------------------------------------------------------------------
original_getaddrinfo = socket.getaddrinfo
def forced_ipv6_gai_family(*args, **kwargs):
global original_getaddrinfo
responses = original_getaddrinfo(*args, **kwargs)
return [response
for response in responses
if response[0] == socket.AF_INET6]
def forced_ipv4_gai_family(*args, **kwargs):
global original_getaddrinfo
responses = original_getaddrinfo(*args, **kwargs)
return [response
for response in responses
if response[0] == socket.AF_INET]
#-------------------------------------------------------------------------------
def load_json(domain,retries):
#if args.exclude is True:
# ctx = ssl.create_default_context()
# ssl_handler = urllib.request.HTTPSHandler(context=ctx)
# ssl_opener = urllib.request.build_opener(ssl_handler)
# urllib.request.install_opener(ssl_opener)
item_url = "https://crt.sh/?q="+domain+"&output=json"
counter = 0
while True:
try:
counter = counter+1
item_request = urllib.request.Request(item_url)
infomsg(item_url)
item_request.add_header("Accept", "application/json")
item_request.add_header("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36")
infomsg("sending request")
item_raw = urllib.request.urlopen(item_request, timeout=3)
infomsg("request done")
object_json = json.load(item_raw)
infomsg("json don")
except Exception as e:
object_json = None
message = str(e)
infomsg(message)
if message.startswith("HTTP Error "):
parts = message.split(" ")
rawerrorcode = parts[2].replace(":","")
parsederrorcode = int(rawerrorcode)
infomsg(parsederrorcode)
if parsederrorcode >= 500:
exitnagios("WARNING","server error "+str(parsederrorcode))
if object_json != None:
break
elif counter > retries:
break
else:
time.sleep(5+(counter*2))
return object_json
#-------------------------------------------------------------------------------
def docrtshcall(mode,hostname,valid,timeoutvalue,retries):
mode=mode.lower()
if mode=="ipv6":
socket.getaddrinfo = forced_ipv6_gai_family
elif mode=="ipv4":
socket.getaddrinfo = forced_ipv4_gai_family
else:
exitnagios("CRITICAL","invalid IP mode")
hostnameparts = hostname.split(".")
if "" in hostnameparts:
hostnameparts.remove("")
if len(hostnameparts)==2:
hostnameparts.insert(0,"*")
hostname = ".".join(hostnameparts)
issuers = valid.split(";")
start = datetime.datetime.now(datetime.UTC)
parsed = load_json(hostname,retries)
elapsed = datetime.datetime.now(datetime.UTC)-start
duration = elapsed.total_seconds()
if parsed != None:
valid = True
problems = []
totalfound = len(parsed)
for item in parsed:
issuerid = str(item["issuer_ca_id"])
if issuerid not in issuers:
rawnotafter = str(item["not_after"])
parsednotafter = dateutil.parser.parse(rawnotafter)
infomsg(parsednotafter)
if parsednotafter >= datetime.datetime.now():
valid = False
problems.append(item)
infomsg(item)
if totalfound==0:
exitnagios("WARNING","no certificates in crt.sh | found="+str(totalfound)+" duration="+str(duration))
elif valid==True:
exitnagios("OK","no issues found | found="+str(totalfound)+" duration="+str(duration))
else:
exitnagios("CRITICAL","found problems --- "+str(problems)+" | found="+str(totalfound)+" duration="+str(duration))
exitnagios("WARNING","crt.sh does not answer properly")
#-------------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-H", "--hostname", dest="hostname", default="", help="domain to check")
parser.add_argument("-v", "--valid", dest="valid", default="", help="valid ids")
parser.add_argument("-t", "--timeout", dest="timeout", default="10:WARNING", help="timeout value:STATUS")
parser.add_argument("-®", "--debug", action="store_true", dest="debug", default=False, help="be more verbose")
args = parser.parse_args()
setcandebug(args.debug)
if (args.hostname == ""):
exitnagios("CRITICAL","hostname not defined")
elif (args.valid == ""):
exitnagios("CRITICAL","valid CA ids not defined")
parts = args.timeout.split(":")
timeoutvalue = int(parts[0])
if len(parts)>1:
timeoutstatus = parts[1]
else:
timeoutstatus = "WARNING"
start = datetime.datetime.now(datetime.UTC)
docrtshcall("ipv6",args.hostname,args.valid,timeoutvalue,5)
docrtshcall("ipv4",args.hostname,args.valid,timeoutvalue,5)
elapsed = datetime.datetime.now(datetime.UTC)-start
duration = elapsed.total_seconds()
exitnagios(timeoutstatus,"cannot get crt.sh data | found=0 duration="+str(duration))
if __name__ == "__main__":
main()
#-------------------------------------------------------------------------------