-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcheck_http
486 lines (431 loc) · 20.2 KB
/
check_http
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
#!/usr/bin/env python3
#-------------------------------------------------------------------------------
import sys
import os
import certifi
import subprocess
import argparse
import io
import pycurl
import socket
import ssl
import collections
import traceback
import datetime
import dns.query
import dns.zone
import dns.resolver
import dns.exception
import time
#-------------------------------------------------------------------------------
_candebug = False
def candebug():
global _candebug
return _candebug
def setcandebug(value):
global _candebug
_candebug = value
def infomsg(msg):
if candebug() == True:
print(msg, flush=True)
def exitnagios(status,message):
if status=="OK":
exitcode = 0
elif status=="WARNING":
exitcode = 1
elif status=="CRITICAL":
exitcode = 2
elif status=="UNKNOWN":
exitcode = 3
else:
exitcode = 4
print(status+": "+message, flush=True)
sys.exit(exitcode)
#-------------------------------------------------------------------------------
original_getaddrinfo = socket.getaddrinfo
def forced_ipv6_gai_family(*args, **kwargs):
global original_getaddrinfo
responses = original_getaddrinfo(*args, **kwargs)
return [response
for response in responses
if response[0] == socket.AF_INET6]
def forced_ipv4_gai_family(*args, **kwargs):
global original_getaddrinfo
responses = original_getaddrinfo(*args, **kwargs)
return [response
for response in responses
if response[0] == socket.AF_INET]
#-------------------------------------------------------------------------------
_debuginfo = []
_preheaders = []
_headers = {}
def _debug_function(debug_type, debug_msg):
global _debuginfo
try:
debug_msg = debug_msg.decode().strip()
infomsg("debug -> "+str(debug_type)+" -> "+debug_msg)
except:
pass
def _header_function(header_line):
global _preheaders
global _headers
# HTTP standard specifies that headers are encoded in iso-8859-1.
header_line = header_line.decode('iso-8859-1')
infomsg("raw response -> "+header_line)
# Header lines include the first status line (HTTP/1.x ...).
# We are going to ignore all lines that don't have a colon in them.
# This will botch headers that are split on multiple lines...
if ":" not in header_line:
if header_line!="" and len(_headers)==0:
_preheaders.append(header_line.strip())
return
# Break the header line into header name and value.
name, value = header_line.split(':', 1)
# Remove whitespace that may be present.
# Header lines include the trailing newline, and there may be whitespace
# around the colon.
name = name.strip()
value = value.strip()
# Header names are case insensitive.
# Lowercase name here.
name = name.lower()
infomsg("header -> "+name+": "+value)
# Now we can actually record the header name and value.
# Note: this only works when headers are not duplicated, see below.
_headers[name] = value
def generate_resolves(hostname,ipprotocol,port,dnsservers):
result = []
infomsg("generate_resolves: "+str(dnsservers))
if dnsservers != "":
listdnsservers = dnsservers.split(",")
infomsg("listdnsservers: "+str(listdnsservers))
maxretries = 3
for i in range(maxretries):
try:
if i>1:
time.sleep(i)
customresolver = dns.resolver.Resolver(configure=False)
customresolver.nameservers = listdnsservers
customresolver.retry_servfail = True
customresolver.edns = True
recordtype = ""
if ipprotocol in ["4","-4"]:
recordtype = "A"
elif ipprotocol in ["6","-6"]:
recordtype = "AAAA"
infomsg(recordtype)
result = [hostname+":"+str(port)+":"+str(customresolver.resolve(hostname,recordtype)[0])]
break
except:
pass
infomsg(result)
return result
def dolibcurlcheck_retries(ipprotocol,hostname,port,path,timeoutvalue,timeoutstatus,httpversion,sslversion,expect,unknown,headerstring,certificatedays,interface,dnsservers,retries):
exitlevel = "UNKNOWN"
exitmessage = "not executed"
counter = 0
while True:
try:
infomsg("retry "+str(counter))
exitlevel,exitmessage = dolibcurlcheck_simple(ipprotocol,hostname,port,path,timeoutvalue,timeoutstatus,httpversion,sslversion,expect,unknown,headerstring,certificatedays,interface,dnsservers)
counter = counter+1
if counter>retries:
break
elif exitlevel == "OK":
break
else:
time.sleep(5*counter)
except Exception as e:
exitlevel = "CRITICAL"
exitmessage = "exception - "+str(e)
break
exitnagios(exitlevel,exitmessage)
def dolibcurlcheck_simple(ipprotocol,hostname,port,path,timeoutvalue,timeoutstatus,httpversion,sslversion,expect,unknown,headerstring,certificatedays,interface,dnsservers):
global _debuginfo
global _preheaders
global _headers
cannotresolvecustommsg = "Cannot resolve using custom servers"
_debuginfo = []
_preheaders = []
_headers = {}
curl = None
timeoutmessage = None
failuremessage = None
responsecode = None
totaltime = None
certinfo = None
try:
infomsg("ssl "+str(sslversion))
curlsslfinal = None
if sslversion!=None:
httpprotocol = "https"
if sslversion == "default":
curlsslmin = pycurl.SSLVERSION_DEFAULT
sslversionplus = True
else:
sslversionplus = sslversion.endswith("+")
sslversion = sslversion.strip("+")
curlsslmin = None
infomsg(sslversion) # 1.2+
if sslversion == "TLSv1.0":
curlsslmin = pycurl.SSLVERSION_TLSv1_0
elif sslversion == "TLSv1.1":
curlsslmin = pycurl.SSLVERSION_TLSv1_1
elif sslversion == "TLSv1.2":
curlsslmin = pycurl.SSLVERSION_TLSv1_2
elif sslversion == "TLSv1.3":
curlsslmin = pycurl.SSLVERSION_TLSv1_3
if sslversionplus == False:
curlsslfinal = curlsslmin
else:
curlsslfinal = curlsslmin | curlsslmin
infomsg(sslversion) # 1.2+
else:
httpprotocol = "http"
if path.startswith("/")==False:
path = "/"+path
url = httpprotocol+"://"+hostname+":"+str(port)+path
infomsg("url "+url)
httpversionparts = httpversion.split(",")
if len(httpversionparts)>0:
curlhttpcode = 0
if "0.9" in httpversionparts:
curlhttpcode = curlhttpcode | pycurl.CURL_HTTP_VERSION_0_9
if "1.0" in httpversionparts:
curlhttpcode = curlhttpcode | pycurl.CURL_HTTP_VERSION_1_0
if "1.1" in httpversionparts:
curlhttpcode = curlhttpcode | pycurl.CURL_HTTP_VERSION_1_1
if "2.0" in httpversionparts:
curlhttpcode = curlhttpcode | pycurl.CURL_HTTP_VERSION_2_0
infomsg("precurl")
buffer = io.BytesIO()
curl = pycurl.Curl()
curl.reset()
#curl.setopt(pycurl.CAINFO, "/etc/ssl/certs/ca-certificates.crt")
#curl.setopt(pycurl.CAPATH, "/etc/ssl/certs/")
if candebug() == True:
curl.setopt(pycurl.VERBOSE, 1)
curl.setopt(pycurl.DEBUGFUNCTION, _debug_function)
curl.setopt(pycurl.CAINFO, certifi.where())
curl.setopt(pycurl.URL, url)
curl.setopt(pycurl.WRITEDATA, buffer)
curl.setopt(pycurl.HTTP_VERSION, curlhttpcode)
if ipprotocol in ["4","-4"]:
curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)
elif ipprotocol in ["6","-6"]:
curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V6)
else:
curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER)
if curlsslfinal!=None:
curl.setopt(pycurl.SSL_VERIFYPEER, 1) # default is 1
curl.setopt(pycurl.SSL_VERIFYHOST, 2) # default is 2
curl.setopt(pycurl.SSLVERSION, curlsslfinal)
else:
curl.setopt(pycurl.SSL_VERIFYPEER, 0)
curl.setopt(pycurl.SSL_VERIFYHOST, 0)
if len(expect)>0 or unknown!="" or len(headerstring)>0:
curl.setopt(pycurl.HEADERFUNCTION, _header_function)
if httpprotocol=="https" and certificatedays!="":
curl.setopt(pycurl.OPT_CERTINFO, 1)
curl.setopt(pycurl.TIMEOUT, timeoutvalue)
#curl.setopt(pycurl.FOLLOWLOCATION, 1)
if interface!="":
curl.setopt(pycurl.INTERFACE, interface)
if len(dnsservers)>0:
resolves = generate_resolves(hostname,ipprotocol,port,dnsservers)
infomsg("resolves "+str(resolves))
if len(resolves)>0:
curl.setopt(pycurl.RESOLVE, resolves)
else:
raise Exception(cannotresolvecustommsg)
curl.perform()
responsecode = curl.getinfo(pycurl.RESPONSE_CODE)
totaltime = curl.getinfo(pycurl.TOTAL_TIME)
certinfo = curl.getinfo(pycurl.INFO_CERTINFO)
except pycurl.error as e:
infomsg(traceback.format_exc())
code, msg = e.args
if msg.endswith(": Couldn't connect to server") or msg.endswith(": Could not connect to server") or msg.startswith("Connection timed out after ") or msg.startswith("Operation timed out after ") or msg.startswith("Failed to connect to "):
timeoutmessage = msg
else:
failuremessage = msg
except Exception as e:
infomsg(traceback.format_exc())
msg = str(e)
if msg == cannotresolvecustommsg:
timeoutmessage = msg
else:
failuremessage = msg
finally:
if curl!=None:
curl.close()
infomsg("----- start of preheaders")
infomsg(_preheaders)
infomsg("----- end of preheaders")
infomsg("----- start of headers")
infomsg(_headers)
infomsg("----- end of headers")
infomsg("----- checks stage 1")
infomsg("timeoutmsg: "+str(timeoutmessage))
infomsg("failuremsg: "+str(failuremessage))
if failuremessage!=None:
toleratedfailures = []
toleratedfailures.append("Invalid HTTP header field was received: frame type: 1, stream: 1, name: [upgrade], value: [h2,h2c]")
if failuremessage not in toleratedfailures:
return "CRITICAL","error - "+str(failuremessage)+" | totaltime="+str(totaltime)
if timeoutmessage!=None:
return timeoutstatus,"error - "+str(timeoutmessage)+" | totaltime="+str(totaltime)
infomsg("----- checks stage 2")
if unknown!="":
foundunknown = False
for line in _preheaders:
if line.startswith(unknown):
foundunknown = True
if foundunknown == True:
return timeoutstatus,"match '"+unknown+"' in "+str(_preheaders)+" | totaltime="+str(totaltime)
if len(expect)>0:
foundexpect = False
for line in _preheaders:
for test in expect:
if line.startswith(test):
foundexpect = True
if foundexpect == False:
return "CRITICAL","cannot match '"+str(expect)+"' in "+str(_preheaders)+" | totaltime="+str(totaltime)
infomsg("----- checks stage 3")
for hsitem in headerstring:
headermatchparts = hsitem.split(":",1)
headermatchkey = headermatchparts[0].strip().lower()
headermatchvalue = headermatchparts[1].strip()
if headermatchkey in _headers:
if _headers[headermatchkey] != headermatchvalue:
return "CRITICAL","headerstring '"+headermatchkey+"' is '"+_headers[headermatchkey]+"' but it was expected '"+headermatchvalue+"' | totaltime="+str(totaltime)
else:
return "CRITICAL","cannot locate headerstring '"+headermatchkey+"' | totaltime="+str(totaltime)
infomsg("----- checks stage 4")
daysremaining = -1
if httpprotocol=="https" and certificatedays!="":
certexpirationraw = None
certexpirationparsed = None
try:
infomsg("certinfo is "+str(certinfo))
if certinfo==None:
certvars = []
elif len(certinfo)>0:
certvars = certinfo[0]
else:
certvars = []
for certvar in certvars:
certvarname, certvarvalue = certvar
if certvarname in ["Expire Date", "Expire date"]:
certexpirationraw = certvarvalue
break
if certexpirationraw!=None:
from dateutil.parser import parse
infomsg(certexpirationraw)
certexpirationparsed = parse(certexpirationraw)
except:
infomsg(traceback.format_exc())
pass
if certexpirationparsed==None:
return "CRITICAL","cannot parse certificate expiration date '"+str(certexpirationraw)+"' | totaltime="+str(totaltime)
else:
today = datetime.datetime.now(datetime.UTC)
timeremaining = certexpirationparsed.timestamp()-today.timestamp()
daysremaining = int(timeremaining/86400)
certificatedaysparts = certificatedays.split(",")
criticaldays = int(certificatedaysparts[0])
if len(certificatedaysparts)>1:
warningdays = int(certificatedaysparts[1])
else:
warningdays = -1
if daysremaining<criticaldays:
return "CRITICAL","the certificate is expiring in '"+str(daysremaining)+"' days | totaltime="+str(totaltime)+" remaining="+str(daysremaining)
elif daysremaining<warningdays:
return "WARNING","the certificate is expiring in '"+str(daysremaining)+"' days | totaltime="+str(totaltime)+" remaining="+str(daysremaining)
infomsg("----- checks stage 5")
if responsecode>=500:
return "WARNING","unexpected responsecode: "+str(responsecode)+" - totaltime: "+str(totaltime)+" | totaltime="+str(totaltime)
infomsg("----- checks stage 6")
return "OK","everything seems OK for URL '"+url+"' - responsecode: "+str(responsecode)+" - totaltime: "+str(totaltime)+" | totaltime="+str(totaltime)+" remaining="+str(daysremaining)
# context = ssl.create_default_context()
# with socket.create_connection((hostname, port)) as sock:
# with context.wrap_socket(sock, server_hostname=hostname) as ssock:
# # https://docs.python.org/3/library/ssl.html#ssl.SSLSocket.getpeercert
# cert = ssock.getpeercert()
# alternatives = []
# subject = dict(item[0] for item in cert["subject"])
# alternatives.append(subject["commonName"])
# subjectAltName = collections.defaultdict(set)
# for type_, san in cert["subjectAltName"]:
# subjectAltName[type_].add(san)
# for item in subjectAltName["DNS"]:
# if item not in alternatives:
# alternatives.append(item)
# infomsg(alternatives)
# matching = False
# for item in alternatives:
# if item == hostname:
# matching = True
# break
# if matching:
# response=0
# output = "can connect and certificate for '"+hostname+"' matches via extended search"
# else:
# response=2
# foundname = output.split("'")[1]
# output = "can connect, but the certificate found '"+foundname+"' name does not match the expected '"+hostname+"'"
#-------------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-4", "--ipv4", action="store_true", dest="ipv4", default=False, help="use ipv4 (exclusive with ipv6)")
parser.add_argument("-6", "--ipv6", action="store_true", dest="ipv6", default=False, help="use ipv6 (exclusive with ipv4)")
parser.add_argument("-0", "--ipv0", action="store_true", dest="ipv0", default=False, help="do not specify ip protocol version")
parser.add_argument("-H", "--hostname", dest="hostname", default="", help="server to check")
parser.add_argument("-A", "--path", dest="path", default="", help="path to check")
parser.add_argument("-p", "--port", dest="port", default="80", help="port to use, usually 80 or 443")
parser.add_argument("-t", "--timeout", dest="timeout", default="10:CRITICAL", help="timeout value:STATUS")
parser.add_argument("-S", "--sslversion", dest="sslversion", default=None, help="SSLv2, SSLv3, TLSv1, TLSv1.1, TLSv1.2, TLSv1.2+")
parser.add_argument("-e", "--expect", action="append", dest="expect", default=[], help="first (status) line of the server response (default: HTTP/1.)")
parser.add_argument("-K", "--unknown", dest="unknown", default="", help="match this header to return unknown")
parser.add_argument("-d", "--headerstring", action="append", dest="headerstring", default=[], help="string to expect in the response headers")
parser.add_argument("-C", "--certificatedays", dest="certificatedays", default="5,10", help="minimum number of days a certificate has to be valid crit,warn")
parser.add_argument("-P", "--httpversion", dest="httpversion", default="", help="0.9,1.0,1.1,2,3")
parser.add_argument("-I", "--interface", dest="interface", default="", help="what interface to use")
parser.add_argument("-D", "--dnsservers", dest="dnsservers", default="", help="dns servers to use")
parser.add_argument("-r", "--retries", dest="retries", default="5", help="number of times to try")
parser.add_argument("-®", "--debug", action="store_true", dest="debug", default=False, help="be more verbose")
args = parser.parse_args()
setcandebug(args.debug)
protocol = None
exclist = 0
if args.ipv0 == True :
protocol = ""
exclist = exclist+1
if args.ipv4 == True :
protocol = "-4"
exclist = exclist+1
socket.getaddrinfo = forced_ipv4_gai_family
if args.ipv6 == True :
protocol = "-6"
exclist = exclist+1
socket.getaddrinfo = forced_ipv6_gai_family
if (exclist>1) :
exitnagios("CRITICAL","select ip protocol version 4 or 6, or 0 to relay on OS handling")
elif (exclist<1) :
exitnagios("CRITICAL","no protocol selected")
retries = int(args.retries)
port = int(args.port)
parts = args.timeout.split(":")
timeoutvalue = int(parts[0])
if len(parts)>1:
timeoutstatus = parts[1]
else:
timeoutstatus = "CRITICAL"
if timeoutstatus in ["OK", "WARNING", "CRITICAL", "UNKNOWN"]:
infomsg("precall")
infomsg(str(args.hostname)+" "+str(port)+" "+str(args.httpversion)+" "+str(args.sslversion))
dolibcurlcheck_retries(protocol,args.hostname,port,args.path,timeoutvalue,timeoutstatus,args.httpversion,args.sslversion,args.expect,args.unknown,args.headerstring,args.certificatedays,args.interface,args.dnsservers,retries)
else:
exitnagios("CRITICAL","timeout status is not valid")
main()
#-------------------------------------------------------------------------------