Skip to content

Commit

Permalink
[202205][acl-loader] Identity ICMP v4/v6 based on IP_PROTOCOL for cus…
Browse files Browse the repository at this point in the history
…tom ACL table types (#3003)

* [acl-loader] Identity ICMP v4/v6 based on IP_PROTOCOL for custom ACL table types (#2994)

What is the motivation for this PR?
When adding ICMPv6 ACL rules in custom ACL table type, current acl-loader will incorrectly treat the ACL table as IPv4. I open this PR to fix this bug and let acl-loader identify ICMP v4 or v6 based on IP_PROTOCOL.
Also fixed some typo in UT of acl-loader to avoid confusion.

How did you do it?
In function convert_icmp, add one step to identify the rule is v4 or v6 based on IP_PROTOCOL.

How did you verify/test it?
Verified by UT.

Signed-off-by: Zhijian Li <[email protected]>

* update

---------

Signed-off-by: Zhijian Li <[email protected]>
  • Loading branch information
lizhijianrd authored Sep 29, 2023
1 parent d9bc820 commit ff8a064
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 6 deletions.
33 changes: 30 additions & 3 deletions acl_loader/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,14 @@ def is_table_l3v6(self, tname):
"""
return self.tables_db_info[tname]["type"].upper() == "L3V6"

def is_table_l3v4v6(self, tname):
"""
Check if ACL table type is L3V4V6
:param tname: ACL table name
:return: True if table type is L3V4V6 else False
"""
return self.tables_db_info[tname]["type"].upper() == "L3V4V6"

def is_table_l3(self, tname):
"""
Check if ACL table type is L3
Expand Down Expand Up @@ -568,9 +576,28 @@ def convert_ip(self, table_name, rule_idx, rule):
def convert_icmp(self, table_name, rule_idx, rule):
rule_props = {}

is_table_v6 = self.is_table_ipv6(table_name)
type_key = "ICMPV6_TYPE" if is_table_v6 else "ICMP_TYPE"
code_key = "ICMPV6_CODE" if is_table_v6 else "ICMP_CODE"
is_rule_v6 = False
if self.is_table_ipv6(table_name):
is_rule_v6 = True
elif self.is_table_l3v4v6(table_name):
# get the IP version type using Ether-Type.
try:
ether_type = rule.l2.config.ethertype
if ether_type == "ETHERTYPE_IPV6":
is_rule_v6 = True
except Exception as e:
pass
else:
# get the IP version type using IP_PROTOCOL.
try:
ip_protocol = rule.ip.config.protocol
if ip_protocol == "IP_ICMPV6" or int(ip_protocol) == self.ip_protocol_map["IP_ICMPV6"]:
is_rule_v6 = True
except Exception as e:
pass

type_key = "ICMPV6_TYPE" if is_rule_v6 else "ICMP_TYPE"
code_key = "ICMPV6_CODE" if is_rule_v6 else "ICMP_CODE"

if rule.icmp.config.type != "" and rule.icmp.config.type != "null":
icmp_type = rule.icmp.config.type
Expand Down
44 changes: 43 additions & 1 deletion tests/acl_input/acl1.json
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@
}
}
},
"2": {
"100": {
"config": {
"sequence-id": 100
},
Expand Down Expand Up @@ -285,6 +285,27 @@
}
}
}
},
"2": {
"actions": {
"config": {
"forwarding-action": "ACCEPT"
}
},
"config": {
"sequence-id": 2
},
"ip": {
"config": {
"protocol": "1"
}
},
"icmp": {
"config": {
"type": "136",
"code": "0"
}
}
}
}
},
Expand All @@ -310,6 +331,27 @@
"destination-ip-address": "fc02::/64"
}
}
},
"2": {
"actions": {
"config": {
"forwarding-action": "ACCEPT"
}
},
"config": {
"sequence-id": 2
},
"ip": {
"config": {
"protocol": "58"
}
},
"icmp": {
"config": {
"type": "136",
"code": "0"
}
}
}
}
},
Expand Down
27 changes: 25 additions & 2 deletions tests/acl_loader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ def test_icmp_translation(self, acl_loader):
def test_icmpv6_translation(self, acl_loader):
acl_loader.rules_info = {}
acl_loader.load_rules_from_file(os.path.join(test_path, 'acl_input/acl1.json'))
print(acl_loader.rules_info)
assert acl_loader.rules_info[("DATAACL_2", "RULE_1")] == {
"ICMPV6_TYPE": 1,
"ICMPV6_CODE": 0,
Expand All @@ -135,6 +134,30 @@ def test_icmpv6_translation(self, acl_loader):
"PRIORITY": "9900"
}

def test_icmp_translation_in_custom_acl_table_type(self, acl_loader):
acl_loader.rules_info = {}
acl_loader.load_rules_from_file(os.path.join(test_path, 'acl_input/acl1.json'))
assert acl_loader.rules_info[("BMC_ACL_NORTHBOUND", "RULE_2")]
assert acl_loader.rules_info[("BMC_ACL_NORTHBOUND", "RULE_2")] == {
"ICMP_TYPE": 136,
"ICMP_CODE": 0,
"IP_PROTOCOL": 1,
"PACKET_ACTION": "FORWARD",
"PRIORITY": "9998"
}

def test_icmpv6_translation_in_custom_acl_table_type(self, acl_loader):
acl_loader.rules_info = {}
acl_loader.load_rules_from_file(os.path.join(test_path, 'acl_input/acl1.json'))
assert acl_loader.rules_info[("BMC_ACL_NORTHBOUND_V6", "RULE_2")]
assert acl_loader.rules_info[("BMC_ACL_NORTHBOUND_V6", "RULE_2")] == {
"ICMPV6_TYPE": 136,
"ICMPV6_CODE": 0,
"IP_PROTOCOL": 58,
"PACKET_ACTION": "FORWARD",
"PRIORITY": "9998"
}

def test_ingress_default_deny_rule(self, acl_loader):
acl_loader.set_mirror_stage("ingress")
acl_loader.get_session_name = mock.MagicMock(return_value="everflow_session_mock")
Expand Down Expand Up @@ -208,7 +231,7 @@ def ttest_icmp_fields_with_non_icmpv6_protocol(self, acl_loader):
assert not acl_loader.rules_info.get("RULE_1")


def test_icmp_fields_with_non_tcp_protocol(self, acl_loader):
def test_tcp_fields_with_non_tcp_protocol(self, acl_loader):
acl_loader.rules_info = {}
acl_loader.load_rules_from_file(os.path.join(test_path, 'acl_input/tcp_bad_protocol_number.json'))
assert not acl_loader.rules_info.get("RULE_1")
Expand Down

0 comments on commit ff8a064

Please sign in to comment.