From 29957e335b37e96794012f1b285123313b89b821 Mon Sep 17 00:00:00 2001 From: "Jonathan G. Underwood" Date: Tue, 28 Jul 2020 10:58:48 +0100 Subject: [PATCH] Establish OpenWRT firewall rule parser This commit adds a firewall rule UCI parser to the OpenWRT backend. This commit includes: - Fixing test_default.py to use the new parser - New tests in test_firewall.py --- .../backends/openwrt/converters/firewall.py | 86 ++++++---- tests/openwrt/test_default.py | 157 +++++++++--------- tests/openwrt/test_firewall.py | 140 ++++++++++++++++ 3 files changed, 268 insertions(+), 115 deletions(-) create mode 100644 tests/openwrt/test_firewall.py diff --git a/netjsonconfig/backends/openwrt/converters/firewall.py b/netjsonconfig/backends/openwrt/converters/firewall.py index f474b3fe4..7d79a4b6f 100644 --- a/netjsonconfig/backends/openwrt/converters/firewall.py +++ b/netjsonconfig/backends/openwrt/converters/firewall.py @@ -5,21 +5,18 @@ class Firewall(OpenWrtConverter): - netjson_key = 'firewall' - intermediate_key = 'firewall' - _uci_types = ['defaults', 'forwarding', 'zone', 'rule'] - _schema = schema['properties']['firewall'] + netjson_key = "firewall" + intermediate_key = "firewall" + _uci_types = ["defaults", "forwarding", "zone", "rule"] + _schema = schema["properties"]["firewall"] def to_intermediate_loop(self, block, result, index=None): - forwardings = self.__intermediate_forwardings(block.pop('forwardings', {})) - zones = self.__intermediate_zones(block.pop('zones', {})) - rules = self.__intermediate_rules(block.pop('rules', {})) - block.update({ - '.type': 'defaults', - '.name': block.pop('id', 'defaults'), - }) - result.setdefault('firewall', []) - result['firewall'] = [self.sorted_dict(block)] + forwardings + zones + rules + forwardings = self.__intermediate_forwardings(block.pop("forwardings", {})) + zones = self.__intermediate_zones(block.pop("zones", {})) + rules = self.__intermediate_rules(block.pop("rules", {})) + block.update({".type": "defaults", ".name": block.pop("id", "defaults")}) + result.setdefault("firewall", []) + result["firewall"] = [self.sorted_dict(block)] + forwardings + zones + rules return result def __intermediate_forwardings(self, forwardings): @@ -29,19 +26,26 @@ def __intermediate_forwardings(self, forwardings): """ result = [] for forwarding in forwardings: - resultdict = OrderedDict((('.name', self.__get_auto_name_forwarding(forwarding)), - ('.type', 'forwarding'))) + resultdict = OrderedDict( + ( + (".name", self.__get_auto_name_forwarding(forwarding)), + (".type", "forwarding"), + ) + ) resultdict.update(forwarding) result.append(resultdict) return result def __get_auto_name_forwarding(self, forwarding): - if 'family' in forwarding.keys(): - uci_name = self._get_uci_name('_'.join([forwarding['src'], forwarding['dest'], - forwarding['family']])) + if "family" in forwarding.keys(): + uci_name = self._get_uci_name( + "_".join([forwarding["src"], forwarding["dest"], forwarding["family"]]) + ) else: - uci_name = self._get_uci_name('_'.join([forwarding['src'], forwarding['dest']])) - return 'forwarding_{0}'.format(uci_name) + uci_name = self._get_uci_name( + "_".join([forwarding["src"], forwarding["dest"]]) + ) + return "forwarding_{0}".format(uci_name) def __intermediate_zones(self, zones): """ @@ -50,14 +54,15 @@ def __intermediate_zones(self, zones): """ result = [] for zone in zones: - resultdict = OrderedDict((('.name', self.__get_auto_name_zone(zone)), - ('.type', 'zone'))) + resultdict = OrderedDict( + ((".name", self.__get_auto_name_zone(zone)), (".type", "zone")) + ) resultdict.update(zone) result.append(resultdict) return result def __get_auto_name_zone(self, zone): - return 'zone_{0}'.format(self._get_uci_name(zone['name'])) + return "zone_{0}".format(self._get_uci_name(zone["name"])) def __intermediate_rules(self, rules): """ @@ -66,24 +71,33 @@ def __intermediate_rules(self, rules): """ result = [] for rule in rules: - if 'config_name' in rule: - del rule['config_name'] - resultdict = OrderedDict((('.name', self.__get_auto_name_rule(rule)), - ('.type', 'rule'))) + if "config_name" in rule: + del rule["config_name"] + resultdict = OrderedDict( + ((".name", self.__get_auto_name_rule(rule)), (".type", "rule")) + ) resultdict.update(rule) result.append(resultdict) return result def __get_auto_name_rule(self, rule): - return 'rule_{0}'.format(self._get_uci_name(rule['name'])) + return "rule_{0}".format(self._get_uci_name(rule["name"])) def to_netjson_loop(self, block, result, index): - result['firewall'] = self.__netjson_firewall(block) - return result + result.setdefault("firewall", {}) + + block.pop(".name") + _type = block.pop(".type") + + if _type == "rule": + rule = self.__netjson_rule(block) + result["firewall"].setdefault("rules", []) + result["firewall"]["rules"].append(rule) + + return self.type_cast(result) + + def __netjson_rule(self, rule): + if "enabled" in rule: + rule["enabled"] = rule.pop("enabled") == "1" - def __netjson_firewall(self, firewall): - del firewall['.type'] - _name = firewall.pop('.name') - if _name != 'firewall': - firewall['id'] = _name - return self.type_cast(firewall) + return self.type_cast(rule) diff --git a/tests/openwrt/test_default.py b/tests/openwrt/test_default.py index 43b32c3de..6a8e314e1 100644 --- a/tests/openwrt/test_default.py +++ b/tests/openwrt/test_default.py @@ -8,78 +8,72 @@ class TestDefault(unittest.TestCase, _TabsMixin): maxDiff = None def test_render_default(self): - o = OpenWrt({ - "luci": [ - { - "config_name": "core", - "config_value": "main", - "lang": "auto", - "resourcebase": "/luci-static/resources", - "mediaurlbase": "/luci-static/bootstrap", - "number": 4, - "boolean": True - } - ], - "firewall": { - "rules": [ - { - "config_name": "rule", - "name": "Allow-MLD", - "src": "wan", - "proto": "icmp", - "src_ip": "fe80::/10", - "family": "ipv6", - "target": "ACCEPT", - "icmp_type": [ - "130/0", - "131/0", - "132/0", - "143/0" - ] - }, + o = OpenWrt( + { + "luci": [ { - "config_name": "rule", - "name": "Rule2", - "src": "wan", - "proto": "icmp", - "src_ip": "192.168.1.1/24", - "family": "ipv4", - "target": "ACCEPT", - "icmp_type": [ - "130/0", - "131/0", - "132/0", - "143/0" - ] + "config_name": "core", + "config_value": "main", + "lang": "auto", + "resourcebase": "/luci-static/resources", + "mediaurlbase": "/luci-static/bootstrap", + "number": 4, + "boolean": True, } - ] + ], + "firewall": { + "rules": [ + { + "name": "Allow-MLD", + "src": "wan", + "proto": "icmp", + "src_ip": "fe80::/10", + "family": "ipv6", + "target": "ACCEPT", + "icmp_type": ["130/0", "131/0", "132/0", "143/0"], + }, + { + "name": "Rule2", + "src": "wan", + "proto": "icmp", + "src_ip": "192.168.1.1/24", + "family": "ipv4", + "target": "ACCEPT", + "icmp_type": ["130/0", "131/0", "132/0", "143/0"], + }, + ] + }, } - }) - expected = self._tabs("""package firewall + ) + expected = self._tabs( + """\ +package firewall + +config defaults 'defaults' config rule 'rule_Allow_MLD' - option family 'ipv6' - list icmp_type '130/0' - list icmp_type '131/0' - list icmp_type '132/0' - list icmp_type '143/0' option name 'Allow-MLD' - option proto 'icmp' option src 'wan' + option proto 'icmp' option src_ip 'fe80::/10' + option family 'ipv6' option target 'ACCEPT' - -config rule 'rule_Rule2' - option family 'ipv4' list icmp_type '130/0' list icmp_type '131/0' list icmp_type '132/0' list icmp_type '143/0' + +config rule 'rule_Rule2' option name 'Rule2' - option proto 'icmp' option src 'wan' + option proto 'icmp' option src_ip '192.168.1.1/24' + option family 'ipv4' option target 'ACCEPT' + list icmp_type '130/0' + list icmp_type '131/0' + list icmp_type '132/0' + list icmp_type '143/0' package luci @@ -142,54 +136,59 @@ def test_parse_default(self): ) o = OpenWrt(native=native) expected = { - "luci": [ + "led": [ { - "config_name": "core", - "config_value": "main", - "lang": "auto", - "resourcebase": "/luci-static/resources", - "mediaurlbase": "/luci-static/bootstrap", - "number": "4", - "boolean": "1", + "dev": "1-1.1", + "interval": 50, + "name": "USB1", + "sysfs": "tp-link:green:usb1", + "trigger": "usbdev", } ], + "interfaces": [{"name": "eth0", "type": "ethernet"}], "firewall": { "rules": [ { - "config_name": "rule", + "family": "ipv6", + "icmp_type": ["130/0", "131/0", "132/0", "143/0"], "name": "Allow-MLD", - "src": "wan", "proto": "icmp", + "src": "wan", "src_ip": "fe80::/10", - "family": "ipv6", "target": "ACCEPT", - "icmp_type": ["130/0", "131/0", "132/0", "143/0"] } ] }, - "led": [ + "luci": [ { - "name": "USB1", - "sysfs": "tp-link:green:usb1", - "trigger": "usbdev", - "dev": "1-1.1", - "interval": 50, + "boolean": "1", + "lang": "auto", + "mediaurlbase": "/luci-static/bootstrap", + "number": "4", + "resourcebase": "/luci-static/resources", + "config_value": "main", + "config_name": "core", } ], - "interfaces": [{"name": "eth0", "type": "ethernet"}], "system": [ - {"test": "1", "config_name": "custom", "config_value": "custom"} + {"test": "1", "config_value": "custom", "config_name": "custom"} ], } + + print("*" * 80) + import json + + print(json.dumps(o.config, indent=4)) + print("*" * 80) self.assertDictEqual(o.config, expected) def test_skip(self): o = OpenWrt({"skipme": {"enabled": True}}) - self.assertEqual(o.render(), '') + self.assertEqual(o.render(), "") def test_warning(self): o = OpenWrt({"luci": [{"unrecognized": True}]}) - self.assertEqual(o.render(), '') + self.assertEqual(o.render(), "") def test_merge(self): template = { @@ -228,8 +227,8 @@ def test_merge(self): self.assertEqual(o.config, expected) def test_skip_nonlists(self): - o = OpenWrt({"custom_package": {'unknown': True}}) - self.assertEqual(o.render(), '') + o = OpenWrt({"custom_package": {"unknown": True}}) + self.assertEqual(o.render(), "") def test_render_invalid_uci_name(self): o = OpenWrt( diff --git a/tests/openwrt/test_firewall.py b/tests/openwrt/test_firewall.py new file mode 100644 index 000000000..974490b92 --- /dev/null +++ b/tests/openwrt/test_firewall.py @@ -0,0 +1,140 @@ +import textwrap +import unittest + +from netjsonconfig import OpenWrt +from netjsonconfig.utils import _TabsMixin + + +class TestFirewall(unittest.TestCase, _TabsMixin): + maxDiff = None + + _rule_1_netjson = { + "firewall": { + "rules": [ + { + "name": "Allow-MLD", + "src": "wan", + "src_ip": "fe80::/10", + "proto": "icmp", + "icmp_type": ["130/0", "131/0", "132/0", "143/0"], + "target": "ACCEPT", + "family": "ipv6", + } + ] + } + } + + _rule_1_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config rule 'rule_Allow_MLD' + option name 'Allow-MLD' + option src 'wan' + option src_ip 'fe80::/10' + option proto 'icmp' + list icmp_type '130/0' + list icmp_type '131/0' + list icmp_type '132/0' + list icmp_type '143/0' + option target 'ACCEPT' + option family 'ipv6' + """ + ) + + def test_render_rule_1(self): + o = OpenWrt(self._rule_1_netjson) + expected = self._tabs(self._rule_1_uci) + self.assertEqual(o.render(), expected) + + def test_parse_rule_1(self): + o = OpenWrt(native=self._rule_1_uci) + self.assertEqual(o.config, self._rule_1_netjson) + + _rule_2_netjson = { + "firewall": { + "rules": [ + { + "name": "Allow-DHCPv6", + "src": "wan", + "src_ip": "fc00::/6", + "dest_ip": "fc00::/6", + "dest_port": "546", + "proto": "udp", + "target": "ACCEPT", + "family": "ipv6", + } + ] + } + } + + _rule_2_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config rule 'rule_Allow_DHCPv6' + option name 'Allow-DHCPv6' + option src 'wan' + option src_ip 'fc00::/6' + option dest_ip 'fc00::/6' + option dest_port '546' + option proto 'udp' + option target 'ACCEPT' + option family 'ipv6' + """ + ) + + def test_render_rule_2(self): + o = OpenWrt(self._rule_2_netjson) + expected = self._tabs(self._rule_2_uci) + self.assertEqual(o.render(), expected) + + def test_parse_rule_2(self): + o = OpenWrt(native=self._rule_2_uci) + self.assertEqual(o.config, self._rule_2_netjson) + + _rule_3_netjson = { + "firewall": { + "rules": [ + { + "name": "Allow-Ping", + "src": "wan", + "proto": "icmp", + "family": "ipv4", + "icmp_type": ["echo-request"], + "target": "ACCEPT", + "enabled": False, + } + ] + } + } + + _rule_3_uci = textwrap.dedent( + """\ + package firewall + + config defaults 'defaults' + + config rule 'rule_Allow_Ping' + option name 'Allow-Ping' + option src 'wan' + option proto 'icmp' + option family 'ipv4' + list icmp_type 'echo-request' + option target 'ACCEPT' + option enabled '0' + """ + ) + + def test_render_rule_3(self): + o = OpenWrt(self._rule_3_netjson) + expected = self._tabs(self._rule_3_uci) + self.assertEqual(o.render(), expected) + + def test_parse_rule_3(self): + o = OpenWrt(native=self._rule_3_uci) + self.assertEqual(o.config, self._rule_3_netjson)