Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
class dynamodb_table_cross_account_access(Check):
def execute(self):
findings = []
trusted_account_ids = dynamodb_client.audit_config.get("trusted_account_ids", [])
for table in dynamodb_client.tables.values():
if table.policy is None:
continue
Expand All @@ -20,6 +21,7 @@ def execute(self):
table.policy,
dynamodb_client.audited_account,
is_cross_account_allowed=False,
trusted_account_ids=trusted_account_ids
):
report.status = "FAIL"
report.status_extended = f"DynamoDB table {table.name} has a resource-based policy allowing cross account access."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
class eventbridge_bus_cross_account_access(Check):
def execute(self):
findings = []
trusted_account_ids = eventbridge_client.audit_config.get("trusted_account_ids", [])
for bus in eventbridge_client.buses.values():
if bus.policy is None:
continue
Expand All @@ -20,6 +21,7 @@ def execute(self):
bus.policy,
eventbridge_client.audited_account,
is_cross_account_allowed=False,
trusted_account_ids=trusted_account_ids
):
report.status = "FAIL"
report.status_extended = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
class eventbridge_schema_registry_cross_account_access(Check):
def execute(self):
findings = []
trusted_account_ids = schema_client.audit_config.get("trusted_account_ids", [])
for registry in schema_client.registries.values():
if registry.policy is None:
continue
Expand All @@ -16,6 +17,7 @@ def execute(self):
registry.policy,
schema_client.audited_account,
is_cross_account_allowed=False,
trusted_account_ids=trusted_account_ids
):
report.status = "FAIL"
report.status_extended = f"EventBridge schema registry {registry.name} allows cross-account access."
Expand Down
22 changes: 19 additions & 3 deletions prowler/providers/aws/services/iam/lib/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ def is_policy_public(
is_cross_account_allowed=True,
not_allowed_actions: list = [],
check_cross_service_confused_deputy=False,
trusted_account_ids: list = None
) -> bool:
"""
Check if the policy allows public access to the resource.
Expand All @@ -397,10 +398,19 @@ def is_policy_public(
is_cross_account_allowed (bool): If the policy can allow cross-account access, default: True (https://docs.aws.amazon.com/IAM/latest/UserGuide/confused-deputy.html#cross-service-confused-deputy-prevention)
not_allowed_actions (list): List of actions that are not allowed, default: []. If not_allowed_actions is empty, the function will not consider the actions in the policy.
check_cross_service_confused_deputy (bool): If the policy is checked for cross-service confused deputy, default: False
trusted_account_ids (list): A list of trusted accound ids to reduce false positives on cross-account checks
Returns:
bool: True if the policy allows public access, False otherwise
"""
is_public = False

if trusted_account_ids is None:
trusted_account_ids = []

trusted_accounts = set(trusted_account_ids)
if source_account:
trusted_accounts.add(source_account)

if policy:
for statement in policy.get("Statement", []):
# Only check allow statements
Expand All @@ -414,13 +424,19 @@ def is_policy_public(
isinstance(principal.get("AWS"), str)
and source_account
and not is_cross_account_allowed
and source_account not in principal.get("AWS", "")
and not any(
trusted_account in principal.get("AWS", "")
for trusted_account in trusted_accounts
)
) or (
isinstance(principal.get("AWS"), list)
and source_account
and not is_cross_account_allowed
and not any(
source_account in principal_aws
and not all(
any(
trusted_account in principal_aws
for trusted_account in trusted_accounts
)
for principal_aws in principal["AWS"]
)
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
class s3_bucket_cross_account_access(Check):
def execute(self):
findings = []
trusted_account_ids = s3_client.audit_config.get("trusted_account_ids", [])
for bucket in s3_client.buckets.values():
if bucket.policy is None:
continue
Expand All @@ -19,7 +20,10 @@ def execute(self):
f"S3 Bucket {bucket.name} does not have a bucket policy."
)
elif is_policy_public(
bucket.policy, s3_client.audited_account, is_cross_account_allowed=False
bucket.policy,
s3_client.audited_account,
is_cross_account_allowed=False,
trusted_account_ids=trusted_account_ids
):
report.status = "FAIL"
report.status_extended = f"S3 Bucket {bucket.name} has a bucket policy allowing cross account access."
Expand Down
44 changes: 44 additions & 0 deletions tests/providers/aws/services/iam/lib/policy_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

TRUSTED_AWS_ACCOUNT_NUMBER = "123456789012"
NON_TRUSTED_AWS_ACCOUNT_NUMBER = "111222333444"
TRUSTED_AWS_ACCOUNT_NUMBER_LIST = ["123456789012", "123456789013", "123456789014"]

TRUSTED_ORGANIZATION_ID = "o-123456789012"
NON_TRUSTED_ORGANIZATION_ID = "o-111222333444"
Expand Down Expand Up @@ -1652,6 +1653,49 @@ def test_policy_does_not_allow_cross_account_access_with_deny_effect(self):
is_cross_account_allowed=False,
)

def test_cross_account_access_trusted_account_list(self):
policy = {
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": f"arn:aws:iam::{TRUSTED_AWS_ACCOUNT_NUMBER_LIST[0]}:root"
},
"Action": "*",
"Resource": "*",
}
]
}
assert not is_policy_public(
policy,
TRUSTED_AWS_ACCOUNT_NUMBER,
is_cross_account_allowed=False,
trusted_account_ids=TRUSTED_AWS_ACCOUNT_NUMBER_LIST
)

def test_cross_account_access_with_principal_list_trusted_account_list(self):
policy = {
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": [
f"arn:aws:iam::{TRUSTED_AWS_ACCOUNT_NUMBER_LIST[0]}:root",
f"arn:aws:iam::{NON_TRUSTED_AWS_ACCOUNT_NUMBER}:root"
]
},
"Action": "*",
"Resource": "*",
}
]
}
assert is_policy_public(
policy,
TRUSTED_AWS_ACCOUNT_NUMBER,
is_cross_account_allowed=False,
trusted_account_ids=TRUSTED_AWS_ACCOUNT_NUMBER_LIST
)

def test_policy_allows_public_access_with_wildcard_principal(self):
policy_allow_wildcard_principal = {
"Statement": [
Expand Down