Skip to content

Commit

Permalink
Establish OpenWRT firewall rule parser
Browse files Browse the repository at this point in the history
This commit adds a firewall rule UCI parser to the OpenWRT backend. This commit
includes:
    - Fixing test_default.py to use the new parser
    - New tests in test_firewall.py
  • Loading branch information
jonathanunderwood committed Jul 28, 2020
1 parent 7837ad8 commit 8e4f186
Show file tree
Hide file tree
Showing 3 changed files with 268 additions and 115 deletions.
86 changes: 50 additions & 36 deletions netjsonconfig/backends/openwrt/converters/firewall.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,18 @@


class Firewall(OpenWrtConverter):
netjson_key = 'firewall'
intermediate_key = 'firewall'
_uci_types = ['defaults', 'forwarding', 'zone', 'rule']
_schema = schema['properties']['firewall']
netjson_key = "firewall"
intermediate_key = "firewall"
_uci_types = ["defaults", "forwarding", "zone", "rule"]
_schema = schema["properties"]["firewall"]

def to_intermediate_loop(self, block, result, index=None):
forwardings = self.__intermediate_forwardings(block.pop('forwardings', {}))
zones = self.__intermediate_zones(block.pop('zones', {}))
rules = self.__intermediate_rules(block.pop('rules', {}))
block.update({
'.type': 'defaults',
'.name': block.pop('id', 'defaults'),
})
result.setdefault('firewall', [])
result['firewall'] = [self.sorted_dict(block)] + forwardings + zones + rules
forwardings = self.__intermediate_forwardings(block.pop("forwardings", {}))
zones = self.__intermediate_zones(block.pop("zones", {}))
rules = self.__intermediate_rules(block.pop("rules", {}))
block.update({".type": "defaults", ".name": block.pop("id", "defaults")})
result.setdefault("firewall", [])
result["firewall"] = [self.sorted_dict(block)] + forwardings + zones + rules
return result

def __intermediate_forwardings(self, forwardings):
Expand All @@ -29,19 +26,26 @@ def __intermediate_forwardings(self, forwardings):
"""
result = []
for forwarding in forwardings:
resultdict = OrderedDict((('.name', self.__get_auto_name_forwarding(forwarding)),
('.type', 'forwarding')))
resultdict = OrderedDict(
(
(".name", self.__get_auto_name_forwarding(forwarding)),
(".type", "forwarding"),
)
)
resultdict.update(forwarding)
result.append(resultdict)
return result

def __get_auto_name_forwarding(self, forwarding):
if 'family' in forwarding.keys():
uci_name = self._get_uci_name('_'.join([forwarding['src'], forwarding['dest'],
forwarding['family']]))
if "family" in forwarding.keys():
uci_name = self._get_uci_name(
"_".join([forwarding["src"], forwarding["dest"], forwarding["family"]])
)
else:
uci_name = self._get_uci_name('_'.join([forwarding['src'], forwarding['dest']]))
return 'forwarding_{0}'.format(uci_name)
uci_name = self._get_uci_name(
"_".join([forwarding["src"], forwarding["dest"]])
)
return "forwarding_{0}".format(uci_name)

def __intermediate_zones(self, zones):
"""
Expand All @@ -50,14 +54,15 @@ def __intermediate_zones(self, zones):
"""
result = []
for zone in zones:
resultdict = OrderedDict((('.name', self.__get_auto_name_zone(zone)),
('.type', 'zone')))
resultdict = OrderedDict(
((".name", self.__get_auto_name_zone(zone)), (".type", "zone"))
)
resultdict.update(zone)
result.append(resultdict)
return result

def __get_auto_name_zone(self, zone):
return 'zone_{0}'.format(self._get_uci_name(zone['name']))
return "zone_{0}".format(self._get_uci_name(zone["name"]))

def __intermediate_rules(self, rules):
"""
Expand All @@ -66,24 +71,33 @@ def __intermediate_rules(self, rules):
"""
result = []
for rule in rules:
if 'config_name' in rule:
del rule['config_name']
resultdict = OrderedDict((('.name', self.__get_auto_name_rule(rule)),
('.type', 'rule')))
if "config_name" in rule:
del rule["config_name"]
resultdict = OrderedDict(
((".name", self.__get_auto_name_rule(rule)), (".type", "rule"))
)
resultdict.update(rule)
result.append(resultdict)
return result

def __get_auto_name_rule(self, rule):
return 'rule_{0}'.format(self._get_uci_name(rule['name']))
return "rule_{0}".format(self._get_uci_name(rule["name"]))

def to_netjson_loop(self, block, result, index):
result['firewall'] = self.__netjson_firewall(block)
return result
result.setdefault("firewall", {})

block.pop(".name")
_type = block.pop(".type")

if _type == "rule":
rule = self.__netjson_rule(block)
result["firewall"].setdefault("rules", [])
result["firewall"]["rules"].append(rule)

return self.type_cast(result)

def __netjson_rule(self, rule):
if "enabled" in rule:
rule["enabled"] = rule.pop("enabled") == "1"

def __netjson_firewall(self, firewall):
del firewall['.type']
_name = firewall.pop('.name')
if _name != 'firewall':
firewall['id'] = _name
return self.type_cast(firewall)
return self.type_cast(rule)
157 changes: 78 additions & 79 deletions tests/openwrt/test_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,78 +8,72 @@ class TestDefault(unittest.TestCase, _TabsMixin):
maxDiff = None

def test_render_default(self):
o = OpenWrt({
"luci": [
{
"config_name": "core",
"config_value": "main",
"lang": "auto",
"resourcebase": "/luci-static/resources",
"mediaurlbase": "/luci-static/bootstrap",
"number": 4,
"boolean": True
}
],
"firewall": {
"rules": [
{
"config_name": "rule",
"name": "Allow-MLD",
"src": "wan",
"proto": "icmp",
"src_ip": "fe80::/10",
"family": "ipv6",
"target": "ACCEPT",
"icmp_type": [
"130/0",
"131/0",
"132/0",
"143/0"
]
},
o = OpenWrt(
{
"luci": [
{
"config_name": "rule",
"name": "Rule2",
"src": "wan",
"proto": "icmp",
"src_ip": "192.168.1.1/24",
"family": "ipv4",
"target": "ACCEPT",
"icmp_type": [
"130/0",
"131/0",
"132/0",
"143/0"
]
"config_name": "core",
"config_value": "main",
"lang": "auto",
"resourcebase": "/luci-static/resources",
"mediaurlbase": "/luci-static/bootstrap",
"number": 4,
"boolean": True,
}
]
],
"firewall": {
"rules": [
{
"name": "Allow-MLD",
"src": "wan",
"proto": "icmp",
"src_ip": "fe80::/10",
"family": "ipv6",
"target": "ACCEPT",
"icmp_type": ["130/0", "131/0", "132/0", "143/0"],
},
{
"name": "Rule2",
"src": "wan",
"proto": "icmp",
"src_ip": "192.168.1.1/24",
"family": "ipv4",
"target": "ACCEPT",
"icmp_type": ["130/0", "131/0", "132/0", "143/0"],
},
]
},
}
})
expected = self._tabs("""package firewall
)
expected = self._tabs(
"""\
package firewall
config defaults 'defaults'
config rule 'rule_Allow_MLD'
option family 'ipv6'
list icmp_type '130/0'
list icmp_type '131/0'
list icmp_type '132/0'
list icmp_type '143/0'
option name 'Allow-MLD'
option proto 'icmp'
option src 'wan'
option proto 'icmp'
option src_ip 'fe80::/10'
option family 'ipv6'
option target 'ACCEPT'
config rule 'rule_Rule2'
option family 'ipv4'
list icmp_type '130/0'
list icmp_type '131/0'
list icmp_type '132/0'
list icmp_type '143/0'
config rule 'rule_Rule2'
option name 'Rule2'
option proto 'icmp'
option src 'wan'
option proto 'icmp'
option src_ip '192.168.1.1/24'
option family 'ipv4'
option target 'ACCEPT'
list icmp_type '130/0'
list icmp_type '131/0'
list icmp_type '132/0'
list icmp_type '143/0'
package luci
Expand Down Expand Up @@ -142,54 +136,59 @@ def test_parse_default(self):
)
o = OpenWrt(native=native)
expected = {
"luci": [
"led": [
{
"config_name": "core",
"config_value": "main",
"lang": "auto",
"resourcebase": "/luci-static/resources",
"mediaurlbase": "/luci-static/bootstrap",
"number": "4",
"boolean": "1",
"dev": "1-1.1",
"interval": 50,
"name": "USB1",
"sysfs": "tp-link:green:usb1",
"trigger": "usbdev",
}
],
"interfaces": [{"name": "eth0", "type": "ethernet"}],
"firewall": {
"rules": [
{
"config_name": "rule",
"family": "ipv6",
"icmp_type": ["130/0", "131/0", "132/0", "143/0"],
"name": "Allow-MLD",
"src": "wan",
"proto": "icmp",
"src": "wan",
"src_ip": "fe80::/10",
"family": "ipv6",
"target": "ACCEPT",
"icmp_type": ["130/0", "131/0", "132/0", "143/0"]
}
]
},
"led": [
"luci": [
{
"name": "USB1",
"sysfs": "tp-link:green:usb1",
"trigger": "usbdev",
"dev": "1-1.1",
"interval": 50,
"boolean": "1",
"lang": "auto",
"mediaurlbase": "/luci-static/bootstrap",
"number": "4",
"resourcebase": "/luci-static/resources",
"config_value": "main",
"config_name": "core",
}
],
"interfaces": [{"name": "eth0", "type": "ethernet"}],
"system": [
{"test": "1", "config_name": "custom", "config_value": "custom"}
{"test": "1", "config_value": "custom", "config_name": "custom"}
],
}

print("*" * 80)
import json

print(json.dumps(o.config, indent=4))
print("*" * 80)
self.assertDictEqual(o.config, expected)

def test_skip(self):
o = OpenWrt({"skipme": {"enabled": True}})
self.assertEqual(o.render(), '')
self.assertEqual(o.render(), "")

def test_warning(self):
o = OpenWrt({"luci": [{"unrecognized": True}]})
self.assertEqual(o.render(), '')
self.assertEqual(o.render(), "")

def test_merge(self):
template = {
Expand Down Expand Up @@ -228,8 +227,8 @@ def test_merge(self):
self.assertEqual(o.config, expected)

def test_skip_nonlists(self):
o = OpenWrt({"custom_package": {'unknown': True}})
self.assertEqual(o.render(), '')
o = OpenWrt({"custom_package": {"unknown": True}})
self.assertEqual(o.render(), "")

def test_render_invalid_uci_name(self):
o = OpenWrt(
Expand Down
Loading

0 comments on commit 8e4f186

Please sign in to comment.