-
Notifications
You must be signed in to change notification settings - Fork 1
/
net.py
322 lines (285 loc) · 12.3 KB
/
net.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
from netmiko import Netmiko, NetmikoTimeoutException, NetmikoAuthenticationException
import re
from random import randint
class Connection:
def __init__(self, host, username, password):
self.host = host
self.username = username
self.password = password
self.device_type = "cisco_ios"
def interfaces(self):
interfaces = {}
for config_section in self.running_config().split("!"):
if "interface" in config_section:
int_name = re.search(
"(?<=interface )(.*?)(?=[\n\r])", config_section
).group(1)
interfaces[int_name] = config_section.split("\n")
while "" in interfaces[int_name]:
interfaces[int_name].remove("")
return interfaces
def running_config(self):
return self.send_cmd("sh running-config")
def stats_policy_interface(self, interface):
stats = {}
try:
show = self.send_cmd("sh policy-map interface {}".format(interface)).split(
"\n"
)
for line in show:
if "Service-policy output" in line:
policy_name = line.replace("Service-policy output: ", "").replace(
" ", ""
)
stats["output_policy"] = policy_name
stats["classes"] = {}
if "Class-map:" in line:
class_name = (
line.replace("Class-map: ", "")
.replace("(match-all)", "")
.replace("(match-any)", "")
.replace(" ", "")
)
service = class_name.replace(policy_name + "-", "")
stats["classes"][service] = {}
stats["classes"][service]["class_name"] = class_name
if "offered rate" in line:
stats["classes"][service]["offered_rate"] = (
re.search("offered rate (.*?) bps", line)
.group(1)
.replace(" ", "")
)
stats["classes"][service]["drop_rate"] = (
re.search("drop rate (.*?) bps", line).group(1).replace(" ", "")
)
stats["success"] = True
except:
stats["success"] = False
return stats
def create_acl(self, acl_name, source_ip):
return self.send_config_cmds(
[
"ip access-list extended {}".format(acl_name),
"permit ip host {} any".format(source_ip),
]
)
def shutdown_int(self, interface):
return self.send_config_cmds(["interface {}".format(interface), "shutdown"])
def no_shutdown_int(self, interface):
return self.send_config_cmds(["interface {}".format(interface), "no shutdown"])
def set_ip_int(self, ip, mask, interface):
return self.send_config_cmds(
["interface {}".format(interface), "ip address {} {}".format(ip, mask)]
)
def no_ip_int(self, interface):
return self.send_config_cmds(
["interface {}".format(interface), "no ip address"]
)
def set_hostname(self, hostname):
return self.send_config_cmds(["hostname {}".format(hostname)])
def class_match_protocol(self, class_name, protocol):
return self.send_config_cmds(
[
"class-map match-all {}".format(class_name),
"match protocol {}".format(protocol),
]
)
def class_match_dscp(self, class_name, dscp_value):
return self.send_config_cmds(
[
"class-map match-all {}".format(class_name),
"match ip dscp {}".format(dscp_value),
]
)
def unset_policy_setting(self, policy_name, classes):
if type(classes) is not list:
raise TypeError("Argument classes should be of list type")
commands = ["policy-map {}".format(policy_name)]
for _class in classes:
commands.append("class {}".format(_class["name"]))
if _class["bandwith"] != None:
commands.append("no bandwith {}".format(_class["bandwith"]))
if _class["priority"] != None:
commands.append("no priority {}".format(_class["priority"]))
if _class["dscp"] != None:
commands.append("no set ip dscp {}")
return self.send_config_cmds(commands)
def set_policy_setting(self, policy_name, classes):
"""
Argument 'classes' is a list of dictionaries where every dict represents
a class with its Quality of Service settings. Example:
classes = [{ 'name': example,
'bandwith': None,
'priority': None
'dscp': None
}, ...]
"""
if type(classes) is not list:
raise TypeError("Argument classes should be of list type")
commands = ["policy-map {}".format(policy_name)]
for _class in classes:
commands.append("class {}".format(_class["name"]))
if _class["bandwith"] != None:
commands.append("bandwith {}".format(_class["bandwith"]))
if _class["priority"] != None:
commands.append("priority {}".format(_class["priority"]))
if _class["dscp"] != None:
commands.append("set ip dscp {}")
return self.send_config_cmds(commands)
def apply_policy_to_int(self, policy_name, interface, type):
return self.send_config_cmds(
[
"interface {}".format(interface),
"service-policy {} {}".format(type, policy_name),
"load-interval 30",
]
)
def get_current_policy_name(self, interface_name, type="output"):
interface_settings = self.interfaces()[interface_name]
string_match = "service-policy {}".format(type)
for setting in interface_settings:
if string_match in setting:
policy_name = setting.replace(string_match, "").replace(" ", "")
return policy_name
return None
def generate_policy_to_int(self, policy, interface, type="output"):
try:
commands = []
description = "Dynamic Generated by Netshaping for Policy: {}".format(
policy.name
)
for service_settings in policy.services:
service = service_settings.service
class_name = _generate_class_name(policy.name, service.name)
if service.match_ips:
access_list_name = _generate_acl_name(service.name)
commands.append(
"no ip access-list extended {}".format(access_list_name)
)
commands.append(
"ip access-list extended {}".format(access_list_name)
)
for ip in service.match_ips.split(","):
if service.match_tcp_ports or service.match_udp_ports:
if service.match_tcp_ports:
commands.append(
"permit tcp host {} any eq {}".format(
_no_spaces(ip),
_ip_list_to_str(service.match_tcp_ports),
)
)
if service.match_udp_ports:
commands.append(
"permit udp host {} any eq {}".format(
_no_spaces(ip),
_ip_list_to_str(service.match_udp_ports),
)
)
else:
commands.append("permit ip host {} any".format(ip))
commands.append("class-map match-all {}".format(class_name))
if service.match_protocol:
commands.append("match protocol {}".format(service.match_protocol))
if service.match_ips:
commands.append(
"match access-group name {}".format(access_list_name)
)
if service.match_dscp:
for dscp in service.match_dscp.split(","):
commands.append("match dscp {}".format(_no_spaces(dscp)))
commands.append(
"no policy-map {}".format(_generate_policy_name(policy.name))
)
commands.append(
"policy-map {}".format(_generate_policy_name(policy.name))
)
commands.append("class {}".format(class_name))
if service_settings.min_bandwidth:
commands.append(
"bandwidth {}".format(service_settings.min_bandwidth)
)
if service_settings.max_bandwidth:
commands.append(
"shape average {}".format(service_settings.max_bandwidth * 1000)
)
if service_settings.mark_dscp:
commands.append("set dscp {}".format(service_settings.mark_dscp))
commands.append("interface {}".format(interface.name))
commands.append("bandwidth {}".format(interface.bandwidth))
commands.append("load-interval 30")
current_policy_name = self.get_current_policy_name(
interface.name, type=type
)
if current_policy_name:
commands.append(
"no service-policy {} {}".format(type, current_policy_name)
)
commands.append(
"service-policy {} {}".format(type, _generate_policy_name(policy.name))
)
self.send_config_cmds(commands)
return commands
except:
return None
def check_policy_interface(self, interface_name, policy_name):
for config_line in self.interfaces()[interface_name]:
if policy_name in config_line:
return True
return False
def send_config_cmds(self, commands):
try:
connection = Netmiko(
host=self.host,
username=self.username,
password=self.password,
device_type=self.device_type,
timeout=150,
)
response = connection.send_config_set(commands)
connection.disconnect()
return response
except NetmikoAuthenticationException:
return "failed authentication"
except:
return "timeout"
def try_connection(self):
try:
connection = Netmiko(
host=self.host,
username=self.username,
password=self.password,
device_type=self.device_type,
timeout=10,
)
connection.disconnect()
return "success"
except NetmikoAuthenticationException:
return "failed_authentication"
except:
return "timeout"
def send_cmd(self, command):
try:
connection = Netmiko(
host=self.host,
username=self.username,
password=self.password,
device_type=self.device_type,
timeout=20,
)
response = connection.send_command(command)
connection.disconnect()
return response
except NetmikoAuthenticationException:
return "failed_authentication"
except:
return "timeout"
def _ip_list_to_str(ip_list):
return str(ip_list).replace("[", "").replace("]", "").replace(",", " ")
def _generate_class_name(policy_name, service_name):
return "D-{}-{}".format(policy_name.replace(" ", ""), service_name.replace(" ", ""))
def _generate_acl_name(service_name):
return "D-{}".format(service_name.replace(" ", ""))
def _generate_policy_name(policy_name):
return "D-{}".format(policy_name.replace(" ", ""))
def _no_spaces(text):
return "{}".format(text.replace(" ", ""))