diff --git a/prowler/providers/aws/services/dynamodb/dynamodb_table_cross_account_access/dynamodb_table_cross_account_access.py b/prowler/providers/aws/services/dynamodb/dynamodb_table_cross_account_access/dynamodb_table_cross_account_access.py index e3c1e817ca..702bbe07cf 100644 --- a/prowler/providers/aws/services/dynamodb/dynamodb_table_cross_account_access/dynamodb_table_cross_account_access.py +++ b/prowler/providers/aws/services/dynamodb/dynamodb_table_cross_account_access/dynamodb_table_cross_account_access.py @@ -6,6 +6,7 @@ class dynamodb_table_cross_account_access(Check): def execute(self): findings = [] + trusted_account_ids = dynamodb_client.audit_config.get("trusted_account_ids", []) for table in dynamodb_client.tables.values(): if table.policy is None: continue @@ -20,6 +21,7 @@ def execute(self): table.policy, dynamodb_client.audited_account, is_cross_account_allowed=False, + trusted_account_ids=trusted_account_ids ): report.status = "FAIL" report.status_extended = f"DynamoDB table {table.name} has a resource-based policy allowing cross account access." diff --git a/prowler/providers/aws/services/eventbridge/eventbridge_bus_cross_account_access/eventbridge_bus_cross_account_access.py b/prowler/providers/aws/services/eventbridge/eventbridge_bus_cross_account_access/eventbridge_bus_cross_account_access.py index 5fee9d0775..800b5bebf9 100644 --- a/prowler/providers/aws/services/eventbridge/eventbridge_bus_cross_account_access/eventbridge_bus_cross_account_access.py +++ b/prowler/providers/aws/services/eventbridge/eventbridge_bus_cross_account_access/eventbridge_bus_cross_account_access.py @@ -8,6 +8,7 @@ class eventbridge_bus_cross_account_access(Check): def execute(self): findings = [] + trusted_account_ids = eventbridge_client.audit_config.get("trusted_account_ids", []) for bus in eventbridge_client.buses.values(): if bus.policy is None: continue @@ -20,6 +21,7 @@ def execute(self): bus.policy, eventbridge_client.audited_account, is_cross_account_allowed=False, + trusted_account_ids=trusted_account_ids ): report.status = "FAIL" report.status_extended = ( diff --git a/prowler/providers/aws/services/eventbridge/eventbridge_schema_registry_cross_account_access/eventbridge_schema_registry_cross_account_access.py b/prowler/providers/aws/services/eventbridge/eventbridge_schema_registry_cross_account_access/eventbridge_schema_registry_cross_account_access.py index c3a2a29377..d4e04e172f 100644 --- a/prowler/providers/aws/services/eventbridge/eventbridge_schema_registry_cross_account_access/eventbridge_schema_registry_cross_account_access.py +++ b/prowler/providers/aws/services/eventbridge/eventbridge_schema_registry_cross_account_access/eventbridge_schema_registry_cross_account_access.py @@ -6,6 +6,7 @@ class eventbridge_schema_registry_cross_account_access(Check): def execute(self): findings = [] + trusted_account_ids = schema_client.audit_config.get("trusted_account_ids", []) for registry in schema_client.registries.values(): if registry.policy is None: continue @@ -16,6 +17,7 @@ def execute(self): registry.policy, schema_client.audited_account, is_cross_account_allowed=False, + trusted_account_ids=trusted_account_ids ): report.status = "FAIL" report.status_extended = f"EventBridge schema registry {registry.name} allows cross-account access." diff --git a/prowler/providers/aws/services/iam/lib/policy.py b/prowler/providers/aws/services/iam/lib/policy.py index b54809f4d6..4416f50308 100644 --- a/prowler/providers/aws/services/iam/lib/policy.py +++ b/prowler/providers/aws/services/iam/lib/policy.py @@ -387,6 +387,7 @@ def is_policy_public( is_cross_account_allowed=True, not_allowed_actions: list = [], check_cross_service_confused_deputy=False, + trusted_account_ids: list = None ) -> bool: """ Check if the policy allows public access to the resource. @@ -397,10 +398,19 @@ def is_policy_public( is_cross_account_allowed (bool): If the policy can allow cross-account access, default: True (https://docs.aws.amazon.com/IAM/latest/UserGuide/confused-deputy.html#cross-service-confused-deputy-prevention) not_allowed_actions (list): List of actions that are not allowed, default: []. If not_allowed_actions is empty, the function will not consider the actions in the policy. check_cross_service_confused_deputy (bool): If the policy is checked for cross-service confused deputy, default: False + trusted_account_ids (list): A list of trusted accound ids to reduce false positives on cross-account checks Returns: bool: True if the policy allows public access, False otherwise """ is_public = False + + if trusted_account_ids is None: + trusted_account_ids = [] + + trusted_accounts = set(trusted_account_ids) + if source_account: + trusted_accounts.add(source_account) + if policy: for statement in policy.get("Statement", []): # Only check allow statements @@ -414,13 +424,19 @@ def is_policy_public( isinstance(principal.get("AWS"), str) and source_account and not is_cross_account_allowed - and source_account not in principal.get("AWS", "") + and not any( + trusted_account in principal.get("AWS", "") + for trusted_account in trusted_accounts + ) ) or ( isinstance(principal.get("AWS"), list) and source_account and not is_cross_account_allowed - and not any( - source_account in principal_aws + and not all( + any( + trusted_account in principal_aws + for trusted_account in trusted_accounts + ) for principal_aws in principal["AWS"] ) ): diff --git a/prowler/providers/aws/services/s3/s3_bucket_cross_account_access/s3_bucket_cross_account_access.py b/prowler/providers/aws/services/s3/s3_bucket_cross_account_access/s3_bucket_cross_account_access.py index 3178a08aa1..31b8ed6013 100644 --- a/prowler/providers/aws/services/s3/s3_bucket_cross_account_access/s3_bucket_cross_account_access.py +++ b/prowler/providers/aws/services/s3/s3_bucket_cross_account_access/s3_bucket_cross_account_access.py @@ -6,6 +6,7 @@ class s3_bucket_cross_account_access(Check): def execute(self): findings = [] + trusted_account_ids = s3_client.audit_config.get("trusted_account_ids", []) for bucket in s3_client.buckets.values(): if bucket.policy is None: continue @@ -19,7 +20,10 @@ def execute(self): f"S3 Bucket {bucket.name} does not have a bucket policy." ) elif is_policy_public( - bucket.policy, s3_client.audited_account, is_cross_account_allowed=False + bucket.policy, + s3_client.audited_account, + is_cross_account_allowed=False, + trusted_account_ids=trusted_account_ids ): report.status = "FAIL" report.status_extended = f"S3 Bucket {bucket.name} has a bucket policy allowing cross account access." diff --git a/tests/providers/aws/services/iam/lib/policy_test.py b/tests/providers/aws/services/iam/lib/policy_test.py index 4687d38dab..d6a5eaf7b9 100644 --- a/tests/providers/aws/services/iam/lib/policy_test.py +++ b/tests/providers/aws/services/iam/lib/policy_test.py @@ -18,6 +18,7 @@ TRUSTED_AWS_ACCOUNT_NUMBER = "123456789012" NON_TRUSTED_AWS_ACCOUNT_NUMBER = "111222333444" +TRUSTED_AWS_ACCOUNT_NUMBER_LIST = ["123456789012", "123456789013", "123456789014"] TRUSTED_ORGANIZATION_ID = "o-123456789012" NON_TRUSTED_ORGANIZATION_ID = "o-111222333444" @@ -1652,6 +1653,49 @@ def test_policy_does_not_allow_cross_account_access_with_deny_effect(self): is_cross_account_allowed=False, ) + def test_cross_account_access_trusted_account_list(self): + policy = { + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "AWS": f"arn:aws:iam::{TRUSTED_AWS_ACCOUNT_NUMBER_LIST[0]}:root" + }, + "Action": "*", + "Resource": "*", + } + ] + } + assert not is_policy_public( + policy, + TRUSTED_AWS_ACCOUNT_NUMBER, + is_cross_account_allowed=False, + trusted_account_ids=TRUSTED_AWS_ACCOUNT_NUMBER_LIST + ) + + def test_cross_account_access_with_principal_list_trusted_account_list(self): + policy = { + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "AWS": [ + f"arn:aws:iam::{TRUSTED_AWS_ACCOUNT_NUMBER_LIST[0]}:root", + f"arn:aws:iam::{NON_TRUSTED_AWS_ACCOUNT_NUMBER}:root" + ] + }, + "Action": "*", + "Resource": "*", + } + ] + } + assert is_policy_public( + policy, + TRUSTED_AWS_ACCOUNT_NUMBER, + is_cross_account_allowed=False, + trusted_account_ids=TRUSTED_AWS_ACCOUNT_NUMBER_LIST + ) + def test_policy_allows_public_access_with_wildcard_principal(self): policy_allow_wildcard_principal = { "Statement": [