-
Notifications
You must be signed in to change notification settings - Fork 1
/
firewalltests.py
274 lines (245 loc) · 9.93 KB
/
firewalltests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
#!/usr/bin/env python3
from random import randint
from copy import deepcopy
import struct
from switchyard.lib.packet import *
from switchyard.lib.address import *
from switchyard.lib.common import *
from switchyard.lib.testing import *
firewall_rules = '''
# drop everything from an internal subnet which shouldn't be allowed
# to communicate with rest of internet
# rule 1
deny ip src 192.168.42.0/24 dst any
# rule 2
deny ip src any dst 192.168.42.0/24
# allow traffic to/from an internal web server that should
# be accessible to external hosts
# rule 3
permit tcp src 192.168.13.13 srcport 80 dst any dstport any
# rule 4
permit tcp src any srcport any dst 192.168.13.13 dstport 80
# allow DNS (udp port 53) traffic in/out of network
# rule 5
permit udp src 192.168.0.0/16 srcport any dst any dstport 53
# rule 6
permit udp src any srcport 53 dst 192.168.0.0/16 dstport any
# allow internal hosts access to web (tcp ports 80 and 443)
# rate limit http traffic to 100 kB/s (12500 bytes/sec), but
# don't rate limit any encrypted HTTP traffic.
# rule 7
permit tcp src 192.168.0.0/16 srcport any dst any dstport 80 ratelimit 12500
# rule 8
permit tcp src any srcport 80 dst 192.168.0.0/16 dstport any ratelimit 12500
# rule 9
permit tcp src 192.168.0.0/16 srcport any dst any dstport 443
# rule 10
permit tcp src any srcport 443 dst 192.168.0.0/16 dstport any
# permit, but impair certain traffic flows
# rule 11
permit tcp src 192.168.0.0/24 srcport any dst any dstport 8000 impair
# permit, but rate limit icmp to 150 bytes/sec.
# NB: this includes *both* directions!
# rule 12
permit icmp src any dst any ratelimit 150
# block everything else
# rule 13
deny ip src any dst any
'''
def rand16(start=0):
return randint(start,2**16-1)
def rand32(start=0):
return randint(start, 2**32-1)
def rand8(start=0):
return randint(start, 2**8-1)
def mketh(xtype = EtherType.IP):
e = Ethernet()
e.ethertype = xtype
e.src = struct.pack('xxI',rand32())
e.dst = struct.pack('xxI',rand32())
return e
def swap(pkt):
pkt = deepcopy(pkt)
e = pkt.get_header(Ethernet)
e.src,e.dst = e.dst,e.src
ip = pkt.get_header(IPv4)
ip.srcip,ip.dstip = ip.dstip, ip.srcip
ip.ttl = 255-ip.ttl
ip.ipid = 0
tport = None
if pkt.has_header(TCP):
tport = pkt.get_header(TCP)
tport.seq, tport.ack = tport.ack, tport.seq
tport.ACK = 1
elif pkt.has_header(UDP):
tport = pkt.get_header(UDP)
if tport is not None:
tport.srcport,tport.dstport = tport.dstport, tport.srcport
return pkt
def firewall_tests():
s = Scenario("Firewall tests")
s.add_file('firewall_rules.txt', firewall_rules)
# two ethernet ports; no IP addresses assigned to
# them. eth0 is internal network-facing, and eth1
# is external network-facing.
s.add_interface('eth0', '00:00:00:00:0b:01')
s.add_interface('eth1', '00:00:00:00:0b:02')
# first set of tests: check that packets that should be allowed through
# are allowed through
t = TCP()
t.SYN = 1
t.srcport = 80
t.dstport = rand16(10000)
t.seq = rand32()
t.ack = rand32()
t.window = rand16(8192)
ip = IPv4()
ip.srcip = '192.168.13.13'
ip.dstip = rand32()
ip.ttl = rand8(12)
ip.protocol = IPProtocol.TCP
pkt = mketh() + ip + t
s.expect(PacketInputEvent('eth0',pkt),
'Packet arriving on eth0 should be permitted since it matches rule 3.')
s.expect(PacketOutputEvent('eth1',pkt),
'Packet forwarded out eth1; permitted since it matches rule 3.')
s.expect(PacketInputEvent('eth1',swap(pkt)),
'Packet arriving on eth1 should be permitted since it matches rule 4.')
s.expect(PacketOutputEvent('eth0',swap(pkt)),
'Packet forwarded out eth0; permitted since it matches rule 3.')
u = UDP()
u.srcport = rand16(10000)
u.dstport = 53
ip.srcip = int(IPv4Address('192.168.113.0')) | rand8()
ip.dstip = rand32()
ip.ttl = rand8(16)
ip.protocol = IPProtocol.UDP
pkt = mketh() + ip + u
s.expect(PacketInputEvent('eth0', pkt),
'Packet arriving on eth0 should be permitted since it matches rule 5.')
s.expect(PacketOutputEvent('eth1', pkt),
'Packet forwarded out eth1; permitted since it matches rule 5.')
s.expect(PacketInputEvent('eth1', swap(pkt)),
'Packet arriving on eth1 should be permitted since it matches rule 6.')
s.expect(PacketOutputEvent('eth0', swap(pkt)),
'Packet forwarded out eth0; permitted since it matches rule 6.')
t = TCP()
t.srcport = rand16(10000)
t.dstport = 443
t.seq = rand32()
t.ack = rand32()
t.SYN = 1
ip.srcip = int(IPv4Address('192.168.113.0')) | rand8()
ip.dstip = rand32()
ip.ttl = rand8(16)
ip.protocol = IPProtocol.TCP
pkt = mketh() + ip + t
s.expect(PacketInputEvent('eth0', pkt),
'Packet arriving on eth0 should be permitted since it matches rule 9.')
s.expect(PacketOutputEvent('eth1', pkt),
'Packet forwarded out eth1; permitted since it matches rule 9.')
s.expect(PacketInputEvent('eth1', swap(pkt)),
'Packet arriving on eth1 should be permitted since it matches rule 10.')
s.expect(PacketOutputEvent('eth0', swap(pkt)),
'Packet forwarded out eth0; permitted since it matches rule 10.')
# next few tests hit rules that have rate limits, but these should
# all be allowed since the payloads are small enough.
t = TCP()
t.srcport = rand16(10000)
t.dstport = 80
t.seq = rand32()
t.ack = rand32()
t.window = rand16()
t.SYN = 1
ip.srcip = int(IPv4Address('192.168.213.0')) | rand8()
ip.dstip = rand32()
ip.ttl = rand8(16)
ip.protocol = IPProtocol.TCP
pkt = mketh() + ip + t
s.expect(PacketInputEvent('eth0', pkt),
'Packet arriving on eth0 should be permitted since it matches rule 7.')
s.expect(PacketOutputEvent('eth1', pkt),
'Packet forwarded out eth1; permitted since it matches rule 7.')
s.expect(PacketInputEvent('eth1', swap(pkt)),
'Packet arriving on eth1 should be permitted since it matches rule 8.')
s.expect(PacketOutputEvent('eth0', swap(pkt)),
'Packet forwarded out eth0; permitted since it matches rule 8.')
ip.srcip = rand32()
ip.dstip = rand32()
ip.protocol = IPProtocol.ICMP
pkt = mketh() + ip + ICMP()
s.expect(PacketInputEvent('eth0', pkt),
'Packet arriving on eth0 should be permitted since it matches rule 12.')
s.expect(PacketOutputEvent('eth1', pkt),
'Packet forwarded out eth1; permitted since it matches rule 12.')
s.expect(PacketInputEvent('eth1', swap(pkt)),
'Packet arriving on eth1 should be permitted since it matches rule 12.')
s.expect(PacketOutputEvent('eth0', swap(pkt)),
'Packet forwarded out eth0; permitted since it matches rule 12.')
# second set of tests: check that packets should be blocked/denied
# from explicit deny rules are not forwarded.
t = TCP()
t.srcport = rand16(10000)
t.dstport = 443
t.seq = rand32()
t.ack = rand32()
t.window = rand16()
t.ACK = 1
t.PSH = 1
ip.srcip = int(IPv4Address('192.168.42.0')) | rand8()
ip.dstip = rand32()
ip.ttl = rand8(16)
ip.protocol = IPProtocol.TCP
pkt = mketh() + ip + t
# shouldn't matter which interface one of these packets arrives on;
# rule matching should be done regardless...
s.expect(PacketInputEvent('eth0', pkt),
'Packet arriving on eth0 should be blocked since it matches rule 1.')
s.expect(PacketInputEvent('eth1', pkt),
'Packet arriving on eth1 should be blocked since it matches rule 1.')
s.expect(PacketInputEvent('eth0', swap(pkt)),
'Packet arriving on eth0 should be blocked since it matches rule 2.')
s.expect(PacketInputEvent('eth1', swap(pkt)),
'Packet arriving on eth1 should be blocked since it matches rule 2.')
# third set of tests: check that other IPv4 packets are blocked (rule 13),
# but that other types of packets are allowed (i.e., non-IPv4)
# UDP packet with source and dest IP addrs not in 192.168.0.0/16
# should implicitly be blocked.
ip = IPv4()
ip.protocol = IPProtocol.UDP
ip.srcip = rand32(2**24)
ip.dstip = rand32(2**24)
udp = UDP()
udp.srcport = rand16()
udp.dstport = rand16()
pkt = mketh() + ip + udp + b'Hello, little UDP packet!'
s.expect(PacketInputEvent('eth0', pkt),
'UDP packet arrives on eth0; should be blocked since addresses it contains aren\'t explicitly allowed (rule 13).')
s.expect(PacketInputEvent('eth1', pkt),
'UDP packet arrives on eth1; should be blocked since addresses it contains aren\'t explicitly allowed (rule 13).')
# ARP should be blocked
pkt = create_ip_arp_request(mketh().src, rand32(), rand32())
s.expect(PacketInputEvent('eth0', pkt),
'ARP request arrives on eth0; should be allowed since it does not match any rule')
s.expect(PacketOutputEvent('eth1', pkt),
'ARP request should be forwarded out eth1 since it does not match any rule')
s.expect(PacketInputEvent('eth1', pkt),
'ARP request arrives on eth1; should be blocked since it is not explicitly allowed (rule 13).')
s.expect(PacketOutputEvent('eth0', pkt),
'ARP request should be forwarded out eth0 since it does not match any rule')
# IPv6 should be blocked
e = mketh()
e.ethertype = EtherType.IPv6
ip = IPv6()
ip.nextheader = IPProtocol.ICMPv6
pkt = e + ip + ICMPv6()
s.expect(PacketInputEvent('eth0', pkt),
'IPv6 packet arrives on eth0; should be allowed since it does not match any rule.')
s.expect(PacketOutputEvent('eth1', pkt),
'IPv6 packet forwarded out eth1 since it does not match any rule.')
s.expect(PacketInputEvent('eth1', pkt),
'IPv6 packet arrives on eth1; should be blocked since it is not explicitly allowed (rule 13).')
s.expect(PacketOutputEvent('eth0', pkt),
'IPv6 packet forwarded out eth0 since it does not match any rule.')
return s
scenario = firewall_tests()