-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcheck_fail2ban
152 lines (125 loc) · 5.14 KB
/
check_fail2ban
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#!/usr/bin/env python3
#-------------------------------------------------------------------------------
import argparse
import sys
import os
import subprocess
import platform
import ipaddress
import GeoIP
#-------------------------------------------------------------------------------
_candebug = False
def candebug():
global _candebug
return _candebug
def setcandebug(value):
global _candebug
_candebug = value
def infomsg(msg):
if candebug() == True:
print(msg, flush=True)
def exitnagios(status,message):
if status=="OK":
exitcode = 0
elif status=="WARNING":
exitcode = 1
elif status=="CRITICAL":
exitcode = 2
elif status=="UNKNOWN":
exitcode = 3
else:
exitcode = 4
print(status+": "+message, flush=True)
sys.exit(exitcode)
#-------------------------------------------------------------------------------
def get_fail2ban_client_path():
return "/usr/bin/fail2ban-client"
def run_command(cmdlist):
try:
result = subprocess.run(cmdlist, text=True, capture_output=True, check=True)
return result.stdout.strip().split("\n")
except subprocess.CalledProcessError as e:
exitnagios("CRITICAL","Command failed: " + str(cmdlist) + ". Error: " + str(e))
def check_fail2ban_status():
cmdlist = [get_fail2ban_client_path(),"ping"]
infomsg("DEBUG: Running command: " + str(cmdlist))
output = run_command(cmdlist)
infomsg("DEBUG: Fail2Ban server response: " + str(output))
if "Server replied: pong" not in output:
exitnagios("CRITICAL","Fail2Ban server not responding. Check if it is running and accessible.")
def get_jail_list():
cmdlist = [get_fail2ban_client_path(),"status"]
infomsg("DEBUG: Running command: " + str(cmdlist))
output = run_command(cmdlist)
for line in output:
if "Jail list:" in line:
jail_list = [jail.strip() for jail in line.split("Jail list:")[-1].strip().split(",")]
infomsg("DEBUG: Jails list: " + ", ".join(jail_list))
return jail_list
exitnagios("CRITICAL","Unable to retrieve the jail list.")
def get_banned_ips(jail_name):
cmdlist = [get_fail2ban_client_path(),"status",jail_name]
infomsg("DEBUG: Running command: " + str(cmdlist))
output = run_command(cmdlist)
for line in output:
if "Banned IP list:" in line:
banned_ips = [ip.strip() for ip in line.split("Banned IP list:")[-1].strip().split()]
infomsg("DEBUG: Currently banned IPs for jail " + jail_name + ": " + " ".join(banned_ips))
return banned_ips
return []
def is_ipv6(ip):
try:
ip_obj = ipaddress.ip_address(ip)
return isinstance(ip_obj, ipaddress.IPv6Address)
except ValueError:
return False
def get_city_country(ip_address):
if is_ipv6(ip_address):
database = "/usr/share/GeoIP/GeoIPv6.dat"
else:
database = "/usr/share/GeoIP/GeoIP.dat"
gi = GeoIP.open(database, GeoIP.GEOIP_STANDARD)
if gi is not None:
city = ""
country = gi.country_name_by_addr(ip_address)
else:
city = None
country = None
return country, city
#-------------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-j", "--jail", dest="jail", help="Specify a specific jail to check.")
parser.add_argument("-w", "--warning", dest="warning", default="1", help="Warning threshold for banned IPs.")
parser.add_argument("-c", "--critical", dest="critical", default="2", help="Critical threshold for banned IPs.")
parser.add_argument("-®", "--debug", action="store_true", dest="debug", default=False, help="be more verbose")
args = parser.parse_args()
setcandebug(args.debug)
jail = args.jail
warning = int(args.warning)
critical = int(args.critical)
check_fail2ban_status()
jails = []
if jail:
jails = [jail]
else:
jails = get_jail_list()
dict_banned = {}
for jail_name in jails:
banned_ips = get_banned_ips(jail_name)
infomsg("DEBUG: Jail " + jail_name + " has " + str(len(banned_ips)) + " banned IP(s): " + ", ".join(banned_ips))
for banned_ip in banned_ips:
if banned_ip not in dict_banned.keys():
country, city = get_city_country(banned_ip)
dict_banned[banned_ip] = [city,country]
countjails = len(jails)
countbanned = len(dict_banned)
state = "OK"
if countbanned >= critical:
state = "CRITICAL"
elif countbanned >= warning:
state = "WARNING"
exitnagios(state,str(countjails)+" detected jails with "+str(countbanned)+" banned IPs: " + str(dict_banned)+" | banned="+str(countbanned)+" jails="+str(countjails))
if __name__ == "__main__":
main()
#-------------------------------------------------------------------------------