Skip to content

Commit

Permalink
Update az aro permissions validation to mirror RP frontend validati…
Browse files Browse the repository at this point in the history
…on (#3395)

* Update az aro permissions validation to mirror RP frontend validation

* refactor can_do_action to return boolean and shift error reporting to validate_resource
  • Loading branch information
tsatam authored Feb 19, 2024
1 parent 5ac65fd commit a861f41
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 13 deletions.
30 changes: 20 additions & 10 deletions python/az/aro/azext_aro/_dynamic_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,29 @@

def can_do_action(perms, action):
for perm in perms:
for not_action in perm.not_actions:
match = re.escape(not_action)
match = re.match("(?i)^" + match.replace(r"\*", ".*") + "$", action)
if match:
return f"{action} permission is disabled"
matched = False

for perm_action in perm.actions:
match = re.escape(perm_action)
match = re.match("(?i)^" + match.replace(r"\*", ".*") + "$", action)
if match:
return None
matched = True
break

if not matched:
continue

for not_action in perm.not_actions:
match = re.escape(not_action)
match = re.match("(?i)^" + match.replace(r"\*", ".*") + "$", action)
if match:
matched = False
break

if matched:
return True

return f"{action} permission is missing"
return False


def validate_resource(client, key, resource, actions):
Expand All @@ -51,9 +62,8 @@ def validate_resource(client, key, resource, actions):
for action in actions:
perms, perms_copy = tee(perms)
perms_list = list(perms_copy)
error = can_do_action(perms_list, action)
if error is not None:
row = [key, resource['name'], log_entry_type["error"], error]
if not can_do_action(perms_list, action):
row = [key, resource['name'], log_entry_type["error"], f"{action} permission is missing"]
errors.append(row)

return errors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,82 @@

from unittest.mock import Mock, patch
from azext_aro._dynamic_validators import (
dyn_validate_cidr_ranges, dyn_validate_subnet_and_route_tables, dyn_validate_vnet, dyn_validate_resource_permissions, dyn_validate_version
can_do_action,
dyn_validate_cidr_ranges,
dyn_validate_subnet_and_route_tables,
dyn_validate_vnet,
dyn_validate_resource_permissions,
dyn_validate_version
)

from azure.mgmt.authorization.models import Permission
import pytest

test_can_do_action_data = [
(
"empty permissions list",
[],
"Microsoft.Network/virtualNetworks/subnets/join/action",
False
),
(
"has permission - exact",
[
Permission(actions=["Microsoft.Compute/virtualMachines/*"], not_actions=[]),
Permission(actions=["Microsoft.Network/virtualNetworks/subnets/join/action"], not_actions=[]),
],
"Microsoft.Network/virtualNetworks/subnets/join/action",
True
),
(
"has permission - wildcard",
[
Permission(actions=["Microsoft.Network/virtualNetworks/subnets/*/action"], not_actions=[]),
],
"Microsoft.Network/virtualNetworks/subnets/join/action",
True
),
(
"has permission - exact, conflict",
[
Permission(actions=[], not_actions=["Microsoft.Network/virtualNetworks/subnets/join/action"]),
Permission(actions=["Microsoft.Network/virtualNetworks/subnets/join/action"], not_actions=[]),
],
"Microsoft.Network/virtualNetworks/subnets/join/action",
True
),
(
"has permission excluded - exact",
[
Permission(actions=["Microsoft.Network/*"], not_actions=["Microsoft.Network/virtualNetworks/subnets/join/action"]),
],
"Microsoft.Network/virtualNetworks/subnets/join/action",
False
),
(
"has permission excluded - wildcard",
[
Permission(actions=["Microsoft.Network/*"], not_actions=["Microsoft.Network/virtualNetworks/subnets/*/action"]),
],
"Microsoft.Network/virtualNetworks/subnets/join/action",
False
)
]


@pytest.mark.parametrize(
"test_description, perms, action, expected",
test_can_do_action_data,
ids=[i[0] for i in test_can_do_action_data]
)
def test_can_do_action(
test_description, perms, action, expected
):
actual = can_do_action(perms, action)

if actual != expected:
raise Exception(f"Error mismatch, expected: {expected}, actual: {actual}")


test_validate_cidr_data = [
(
Expand Down Expand Up @@ -166,7 +236,7 @@ def test_validate_cidr(
"child_name_1": None
},
Mock(),
"Microsoft.Network/routeTables/join/action permission is disabled"
"Microsoft.Network/routeTables/join/action permission is missing"
),
(
"should return missing permission when actions are not present",
Expand Down Expand Up @@ -301,7 +371,7 @@ def test_validate_subnets(
"child_name_1": None
},
Mock(),
"Microsoft.Network/virtualNetworks/join/action permission is disabled"
"Microsoft.Network/virtualNetworks/join/action permission is missing"
),
(
"should return missing permission when actions are not present",
Expand Down

0 comments on commit a861f41

Please sign in to comment.