diff --git a/python/IAM_TEST_ACTIONS/IAM_TEST_ACTIONS-test.py b/python/IAM_TEST_ACTIONS/IAM_TEST_ACTIONS-test.py new file mode 100644 index 00000000..593b561d --- /dev/null +++ b/python/IAM_TEST_ACTIONS/IAM_TEST_ACTIONS-test.py @@ -0,0 +1,328 @@ +# +# This file made available under CC0 1.0 Universal (https://creativecommons.org/publicdomain/zero/1.0/legalcode) +# +# Created with the Rule Development Kit: https://github.com/awslabs/aws-config-rdk +# Can be used stand-alone or with the Rule Compliance Engine: https://github.com/awslabs/aws-config-engine-for-compliance-as-code +# +import sys +import unittest +try: + from unittest.mock import MagicMock, patch, ANY +except ImportError: + import mock + from mock import MagicMock, patch, ANY +import botocore +from botocore.exceptions import ClientError +import json + +############## +# Parameters # +############## + +# Define the default resource to report to Config Rules +DEFAULT_RESOURCE_TYPE = 'AWS::IAM::Role' + +############# +# Main Code # +############# + +config_client_mock = MagicMock() +sts_client_mock = MagicMock() +iam_client_mock = MagicMock() + +class Boto3Mock(): + def client(self, client_name, *args, **kwargs): + if client_name == 'config': + return config_client_mock + elif client_name == 'sts': + return sts_client_mock + elif client_name == 'iam': + return iam_client_mock + else: + raise Exception("Attempting to create an unknown client") + +sys.modules['boto3'] = Boto3Mock() + +rule = __import__('IAM_TEST_ACTIONS') + +class ComplianceTest(unittest.TestCase): + invoking_event_default = '{\"configurationItem\":{\"roleName\":\"somerolename\",\"arn\":\"arn:aws:iam::111122223333:role/somerolename\",\"configurationItemCaptureTime\":\"2016-10-06T16:46:16.261Z\",\"awsAccountId\":\"123456789012\",\"configurationItemStatus\":\"OK\",\"resourceId\":\"AIDAICVB3PKAQMPEGDW2C\",\"resourceName\":\"somerolename\",\"configurationStateMd5Hash\":\"8f1ee69b297895a0f8bc5753eca68e96\",\"resourceCreationTime\":\"2016-10-06T16:46:10.489Z\",\"configurationStateId\":0,\"configurationItemVersion\":\"1.2\",\"ARN\":\"arn:aws:ec2:eu-west-1:123456789012:instance/i-00000000\",\"awsRegion\":\"eu-west-1\",\"availabilityZone\":\"eu-west-1\",\"resourceType\":\"AWS::IAM::Role\",\"tags\":{\"\":\"\"},\"relationships\":[{\"resourceId\":\"eipalloc-00000000\",\"resourceType\":\"AWS::IAM::Role\",\"name\":\"Is attached to ElasticIp\"}],\"configuration\":{\"roleName\":\"somerolename\",\"arn\":\"arn:aws:iam::111122223333:role/somerolename\"}},\"messageType\":\"ConfigurationItemChangeNotification\"}' + rule_parameters_default = '{\"actions\":\"s3:GetObject,s3:PutObject\"}' + + invoking_event_permitted = '{\"configurationItem\":{\"roleName\":\"somerolename\",\"arn\":\"arn:aws:iam::111122223333:role/somerolename\",\"configurationItemCaptureTime\":\"2016-10-06T16:46:16.261Z\",\"awsAccountId\":\"123456789012\",\"configurationItemStatus\":\"OK\",\"resourceId\":\"AIDAICVB3PKAQMPEGDW2C\",\"resourceName\":\"somerolename\",\"configurationStateMd5Hash\":\"8f1ee69b297895a0f8bc5753eca68e96\",\"resourceCreationTime\":\"2016-10-06T16:46:10.489Z\",\"configurationStateId\":0,\"configurationItemVersion\":\"1.2\",\"ARN\":\"arn:aws:ec2:eu-west-1:123456789012:instance/i-00000000\",\"awsRegion\":\"eu-west-1\",\"availabilityZone\":\"eu-west-1\",\"resourceType\":\"AWS::IAM::Role\",\"tags\":{\"\":\"\"},\"relationships\":[{\"resourceId\":\"eipalloc-00000000\",\"resourceType\":\"AWS::IAM::Role\",\"name\":\"Is attached to ElasticIp\"}],\"configuration\":{\"roleName\":\"somerolename\",\"arn\":\"arn:aws:iam::111122223333:role/somerolename\"}},\"messageType\":\"ConfigurationItemChangeNotification\"}' + + invoking_event_permitted = '{\"configurationItem\":{\"roleName\":\"somerolename\",\"arn\":\"arn:aws:iam::111122223333:role/somerolename\",\"configurationItemCaptureTime\":\"2016-10-06T16:46:16.261Z\",\"awsAccountId\":\"123456789012\",\"configurationItemStatus\":\"OK\",\"resourceId\":\"AIDAICVB3PKAQMPEGDW2C\",\"resourceName\":\"somerolename\",\"configurationStateMd5Hash\":\"8f1ee69b297895a0f8bc5753eca68e96\",\"resourceCreationTime\":\"2016-10-06T16:46:10.489Z\",\"configurationStateId\":0,\"configurationItemVersion\":\"1.2\",\"ARN\":\"arn:aws:ec2:eu-west-1:123456789012:instance/i-00000000\",\"awsRegion\":\"eu-west-1\",\"availabilityZone\":\"eu-west-1\",\"resourceType\":\"AWS::IAM::Role\",\"tags\":{\"\":\"\"},\"relationships\":[{\"resourceId\":\"eipalloc-00000000\",\"resourceType\":\"AWS::IAM::Role\",\"name\":\"Is attached to ElasticIp\"}],\"configuration\":{\"roleName\":\"somerolename\",\"arn\":\"arn:aws:iam::111122223333:role/somerolename\"}},\"messageType\":\"ConfigurationItemChangeNotification\"}' + rule_parameters_permitted_role = '{\"actions\":\"s3:GetObject,s3:PutObject\",\"permittedRoleNames\":\"somerolename\"}' + + invoking_event_compliant_with_resource = '{\"configurationItem\":{\"roleName\":\"somerolename\",\"arn\":\"arn:aws:iam::111122223333:role/somerolename\",\"configurationItemCaptureTime\":\"2016-10-06T16:46:16.261Z\",\"awsAccountId\":\"123456789012\",\"configurationItemStatus\":\"OK\",\"resourceId\":\"AIDAICVB3PKAQMPEGDW2C\",\"resourceName\":\"somerolename\",\"configurationStateMd5Hash\":\"8f1ee69b297895a0f8bc5753eca68e96\",\"resourceCreationTime\":\"2016-10-06T16:46:10.489Z\",\"configurationStateId\":0,\"configurationItemVersion\":\"1.2\",\"ARN\":\"arn:aws:ec2:eu-west-1:123456789012:instance/i-00000000\",\"awsRegion\":\"eu-west-1\",\"availabilityZone\":\"eu-west-1\",\"resourceType\":\"AWS::IAM::Role\",\"tags\":{\"\":\"\"},\"relationships\":[{\"resourceId\":\"eipalloc-00000000\",\"resourceType\":\"AWS::IAM::Role\",\"name\":\"Is attached to ElasticIp\"}],\"configuration\":{\"roleName\":\"somerolename\",\"arn\":\"arn:aws:iam::111122223333:role/somerolename\"}},\"messageType\":\"ConfigurationItemChangeNotification\"}' + rule_parameters_with_resource = '{\"actions\":\"s3:GetObject,s3:PutObject\",\"resources\":\"aws:arn:s3:::bucketname,aws:arn:s3:::bucketname2\"}' + + invoking_event_compliant_sans_permitted = '{\"configurationItem\":{\"roleName\":\"somerolename\",\"arn\":\"arn:aws:iam::111122223333:role/somerolename\",\"configurationItemCaptureTime\":\"2016-10-06T16:46:16.261Z\",\"awsAccountId\":\"123456789012\",\"configurationItemStatus\":\"OK\",\"resourceId\":\"AIDAICVB3PKAQMPEGDW2C\",\"resourceName\":\"somerolename\",\"configurationStateMd5Hash\":\"8f1ee69b297895a0f8bc5753eca68e96\",\"resourceCreationTime\":\"2016-10-06T16:46:10.489Z\",\"configurationStateId\":0,\"configurationItemVersion\":\"1.2\",\"ARN\":\"arn:aws:ec2:eu-west-1:123456789012:instance/i-00000000\",\"awsRegion\":\"eu-west-1\",\"availabilityZone\":\"eu-west-1\",\"resourceType\":\"AWS::IAM::Role\",\"tags\":{\"\":\"\"},\"relationships\":[{\"resourceId\":\"eipalloc-00000000\",\"resourceType\":\"AWS::IAM::Role\",\"name\":\"Is attached to ElasticIp\"}],\"configuration\":{\"roleName\":\"somerolename\",\"arn\":\"arn:aws:iam::111122223333:role/somerolename\"}},\"messageType\":\"ConfigurationItemChangeNotification\"}' + + + iam_get_role = { + "Role": { + "Path": "/", + "RoleName": "somerolename", + "RoleId": "AIDAICVB3PKAQMPEGDW2C", + "Arn": "arn:aws:iam::111122223333:role/somerolename", + "CreateDate": "2021-01-18T14:44:06+00:00", + "AssumeRolePolicyDocument": { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "", + "Effect": "Allow", + "Principal": { + "AWS": "arn:aws:iam::111122223333:root" + }, + "Action": "sts:AssumeRole", + } + ] + }, + "MaxSessionDuration": 43200, + "RoleLastUsed": { + "LastUsedDate": "2021-03-01T15:48:03+00:00", + "Region": "eu-west-1" + } + } + } + eval_with_allows_no_resource = { + "EvaluationResults": [ + { + "EvalActionName": "s3:GetObject", + "EvalResourceName": "*", + "EvalDecision": "allowed", + "MatchedStatements": [ + { + "SourcePolicyId": "AdministratorAccess", + "SourcePolicyType": "IAM Policy", + "StartPosition": { + "Line": 3, + "Column": 17 + }, + "EndPosition": { + "Line": 8, + "Column": 6 + } + } + ], + "MissingContextValues": [] + } + ] + } + + eval_with_allows_with_resource = { + "EvaluationResults": [ + { + "EvalActionName": "s3:GetObject", + "EvalResourceName": "arn:aws:s3:::bucket_name", + "EvalDecision": "allowed", + "MatchedStatements": [ + { + "SourcePolicyId": "AdministratorAccess", + "SourcePolicyType": "IAM Policy", + "StartPosition": { + "Line": 3, + "Column": 17 + }, + "EndPosition": { + "Line": 8, + "Column": 6 + } + } + ], + "MissingContextValues": [] + } + ] + } + + eval_implicit_deny_no_resource = { + "EvaluationResults": [ + { + "EvalActionName": "s3:GetObject", + "EvalResourceName": "*", + "EvalDecision": "implicitDeny", + "MatchedStatements": [], + "MissingContextValues": [] + } + ] + } + + eval_implicit_deny_with_resource = { + "EvaluationResults": [ + { + "EvalActionName": "s3:GetObject", + "EvalResourceName": "arn:aws:s3:::bucket_name", + "EvalDecision": "implicitDeny", + "MatchedStatements": [], + "MissingContextValues": [] + } + ] + } + + def test_permitted_compliant(self): + """ + Scenario 1: + Given: IAM Role is the list + Then: Return COMPLIANT + """ + lambda_event = build_lambda_configurationchange_event(invoking_event=self.invoking_event_permitted,rule_parameters=self.rule_parameters_permitted_role) + response = rule.lambda_handler(lambda_event, {}) + resp_expected = [] + resp_expected.append(build_expected_response('COMPLIANT', 'AIDAICVB3PKAQMPEGDW2C', annotation="IAM Role is on Permitted List")) + assert_successful_evaluation(self, response, resp_expected) + + def test_no_resource_non_compliant(self): + """ + Scenario 2: + Given: IAM Role is not in the list (or is empty) + And: The parameter is not set + And: The policies attached to the IAM Role allow the Principal to apply one or more of the on the resources in general + Then: Return NON_COMPLIANT + """ + iam_client_mock.get_role = MagicMock(return_value=self.iam_get_role) + iam_client_mock.simulate_principal_policy = MagicMock(return_value=self.eval_with_allows_no_resource) + lambda_event = build_lambda_configurationchange_event(invoking_event=self.invoking_event_default,rule_parameters=self.rule_parameters_default) + response = rule.lambda_handler(lambda_event, {}) + + resp_expected = [] + resp_expected.append(build_expected_response('NON_COMPLIANT', 'AIDAICVB3PKAQMPEGDW2C', annotation="A IAM Policy \"AdministratorAccess\" attached to the role allows s3:GetObject.")) + assert_successful_evaluation(self, response, resp_expected) + + def test_no_resource_compliant(self): + """ + Scenario 3: + Given: IAM Role is not in the list (or is empty) + And: The parameter is not set + And: The policies attached to the IAM Role do not allow the Principal to apply one or more of the on the resources in general + Then: Return COMPLIANT + """ + iam_client_mock.get_role = MagicMock(return_value=self.iam_get_role) + iam_client_mock.simulate_principal_policy = MagicMock(return_value=self.eval_implicit_deny_no_resource) + lambda_event = build_lambda_configurationchange_event(invoking_event=self.invoking_event_default,rule_parameters=self.rule_parameters_default) + response = rule.lambda_handler(lambda_event, {}) + resp_expected = [] + resp_expected.append(build_expected_response('COMPLIANT', 'AIDAICVB3PKAQMPEGDW2C', annotation="IAM Role is compliant")) + assert_successful_evaluation(self, response, resp_expected) + + def test_resource_set_non_compliant(self): + """ + Scenario 4: + Given: IAM Role is not in the list (or is empty) + And: The parameter is set + And: The policies attached to the IAM Role allow the Principal to apply one or more of the on one or more of the + Then: Return NON_COMPLIANT + """ + iam_client_mock.get_role = MagicMock(return_value=self.iam_get_role) + iam_client_mock.simulate_principal_policy = MagicMock(return_value=self.eval_with_allows_with_resource) + lambda_event = build_lambda_configurationchange_event(invoking_event=self.invoking_event_compliant_with_resource,rule_parameters=self.rule_parameters_with_resource) + response = rule.lambda_handler(lambda_event, {}) + + resp_expected = [] + resp_expected.append(build_expected_response('NON_COMPLIANT', 'AIDAICVB3PKAQMPEGDW2C', annotation="A IAM Policy \"AdministratorAccess\" attached to the role allows s3:GetObject.")) + assert_successful_evaluation(self, response, resp_expected) + + def test_resource_set_compliant(self): + """ + Scenario 5: + Given: IAM Role is not in the list (or is empty) + And: The parameter is set + And: The policies attached to the IAM Role do not allow the Principal to apply one or more of the on one or more of the + Then: Return COMPLIANT + """ + + iam_client_mock.get_role = MagicMock(return_value=self.iam_get_role) + iam_client_mock.simulate_principal_policy = MagicMock(return_value=self.eval_implicit_deny_with_resource) + lambda_event = build_lambda_configurationchange_event(invoking_event=self.invoking_event_compliant_with_resource,rule_parameters=self.rule_parameters_with_resource) + response = rule.lambda_handler(lambda_event, {}) + resp_expected = [] + resp_expected.append(build_expected_response('COMPLIANT', 'AIDAICVB3PKAQMPEGDW2C', annotation="IAM Role is compliant")) + assert_successful_evaluation(self, response, resp_expected) + + +#################### +# Helper Functions # +#################### + +def build_lambda_configurationchange_event(invoking_event, rule_parameters=None): + event_to_return = { + 'configRuleName':'myrule', + 'executionRoleArn':'roleArn', + 'eventLeftScope': False, + 'invokingEvent': invoking_event, + 'accountId': '111122223333', + 'configRuleArn': 'arn:aws:config:us-east-1:123456789012:config-rule/config-rule-8fngan', + 'resultToken':'token' + } + if rule_parameters: + event_to_return['ruleParameters'] = rule_parameters + return event_to_return + +def build_lambda_scheduled_event(rule_parameters=None): + invoking_event = '{"messageType":"ScheduledNotification","notificationCreationTime":"2017-12-23T22:11:18.158Z"}' + event_to_return = { + 'configRuleName':'myrule', + 'executionRoleArn':'roleArn', + 'eventLeftScope': False, + 'invokingEvent': invoking_event, + 'accountId': '111122223333', + 'configRuleArn': 'arn:aws:config:us-east-1:123456789012:config-rule/config-rule-8fngan', + 'resultToken':'token' + } + if rule_parameters: + event_to_return['ruleParameters'] = rule_parameters + return event_to_return + +def build_expected_response(compliance_type, compliance_resource_id, compliance_resource_type=DEFAULT_RESOURCE_TYPE, annotation=None): + if not annotation: + return { + 'ComplianceType': compliance_type, + 'ComplianceResourceId': compliance_resource_id, + 'ComplianceResourceType': compliance_resource_type + } + return { + 'ComplianceType': compliance_type, + 'ComplianceResourceId': compliance_resource_id, + 'ComplianceResourceType': compliance_resource_type, + 'Annotation': annotation + } + +def assert_successful_evaluation(testClass, response, resp_expected, evaluations_count=1): + if isinstance(response, dict): + testClass.assertEquals(resp_expected['ComplianceResourceType'], response['ComplianceResourceType']) + testClass.assertEquals(resp_expected['ComplianceResourceId'], response['ComplianceResourceId']) + testClass.assertEquals(resp_expected['ComplianceType'], response['ComplianceType']) + testClass.assertTrue(response['OrderingTimestamp']) + if 'Annotation' in resp_expected or 'Annotation' in response: + testClass.assertEquals(resp_expected['Annotation'], response['Annotation']) + elif isinstance(response, list): + testClass.assertEquals(evaluations_count, len(response)) + for i, response_expected in enumerate(resp_expected): + testClass.assertEquals(response_expected['ComplianceResourceType'], response[i]['ComplianceResourceType']) + testClass.assertEquals(response_expected['ComplianceResourceId'], response[i]['ComplianceResourceId']) + testClass.assertEquals(response_expected['ComplianceType'], response[i]['ComplianceType']) + testClass.assertTrue(response[i]['OrderingTimestamp']) + if 'Annotation' in response_expected or 'Annotation' in response[i]: + testClass.assertEquals(response_expected['Annotation'], response[i]['Annotation']) + +def assert_customer_error_response(testClass, response, customerErrorCode=None, customerErrorMessage=None): + if customerErrorCode: + testClass.assertEqual(customerErrorCode, response['customerErrorCode']) + if customerErrorMessage: + testClass.assertEqual(customerErrorMessage, response['customerErrorMessage']) + testClass.assertTrue(response['customerErrorCode']) + testClass.assertTrue(response['customerErrorMessage']) + if "internalErrorMessage" in response: + testClass.assertTrue(response['internalErrorMessage']) + if "internalErrorDetails" in response: + testClass.assertTrue(response['internalErrorDetails']) + +def sts_mock(): + assume_role_response = { + "Credentials": { + "AccessKeyId": "string", + "SecretAccessKey": "string", + "SessionToken": "string"}} + sts_client_mock.reset_mock(return_value=True) + sts_client_mock.assume_role = MagicMock(return_value=assume_role_response) diff --git a/python/IAM_TEST_ACTIONS/IAM_TEST_ACTIONS.py b/python/IAM_TEST_ACTIONS/IAM_TEST_ACTIONS.py new file mode 100644 index 00000000..ddcb586a --- /dev/null +++ b/python/IAM_TEST_ACTIONS/IAM_TEST_ACTIONS.py @@ -0,0 +1,428 @@ +# Copyright 2017-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may +# not use this file except in compliance with the License. A copy of the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for +# the specific language governing permissions and limitations under the License. +''' +Description + Check if and IAM Roles can execute API actions on account resources +Trigger + Configuration Change on AWS::IAM::Role +Reports on: + AWS::IAM::Role +Rule Parameters: + (required) actions - IAM actions to be tested (comma delimited list) + (optional) resourceArns - AWS resource ARNs to be targeted by IAM actions (comma delimited list) + (optional) permittedRoleNames - Names for IAM Roles that are permitted to have these actions (comma delimited list) +Scenarios: + Scenario 1: + Given: IAM Role does not have permissions that allow the + Then: Return NOT_APPLICABLE + Scenario 2: + Given: IAM Role is the list + Then: Return COMPLIANT + Scenario 3: + Given: IAM Role is not in the list (or is empty) + And: The parameter is not set + And: The policies attached to the IAM Role allow the Principal to apply one or more of the on the resources in general + Then: Return NON_COMPLIANT + Scenario 4: + Given: IAM Role is not in the list (or is empty) + And: The parameter is not set + And: The policies attached to the IAM Role do not allow the Principal to apply one or more of the on the resources in general + Then: Return COMPLIANT + Scenario 5: + Given: IAM Role is not in the list (or is empty) + And: The parameter is set + And: The policies attached to the IAM Role allow the Principal to apply one or more of the on one or more of the + Then: Return NON_COMPLIANT + Scenario 6: + Given: IAM Role is not in the list (or is empty) + And: The parameter is set + And: The policies attached to the IAM Role do not allow the Principal to apply one or more of the on one or more of the + Then: Return COMPLIANT +''' +import json +import datetime +import boto3 +import botocore + +############## +# Parameters # +############## + +# Define the default resource to report to Config Rules +DEFAULT_RESOURCE_TYPE = 'AWS::IAM::Role' + +# Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). +ASSUME_ROLE_MODE = False + +############# +# Main Code # +############# + +def should_ignore_config_item(config_item, permitted_roles): + return (config_item['resourceType'] == 'AWS::IAM::Role' and config_item['resourceName'] in permitted_roles) \ + or (config_item['ARN'].rsplit("/")[1] == 'aws-service-role') + +def evaluate_compliance(event, configuration_item, valid_rule_parameters): + """Form the evaluation(s) to be return to Config Rules + Return either: + None -- when no result needs to be displayed + a string -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE + a dictionary -- the evaluation dictionary, usually built by build_evaluation_from_config_item() + a list of dictionary -- a list of evaluation dictionary , usually built by build_evaluation() + Keyword arguments: + event -- the event variable given in the lambda handler + configuration_item -- the configurationItem dictionary in the invokingEvent + valid_rule_parameters -- the output of the evaluate_parameters() representing validated parameters of the Config Rule + Advanced Notes: + 1 -- if a resource is deleted and generate a configuration change with ResourceDeleted status, the Boilerplate code will put a NOT_APPLICABLE on this resource automatically. + 2 -- if a None or a list of dictionary is returned, the old evaluation(s) which are not returned in the new evaluation list are returned as NOT_APPLICABLE by the Boilerplate code + 3 -- if None or an empty string, list or dict is returned, the Boilerplate code will put a "shadow" evaluation to feedback that the evaluation took place properly + """ + + if 'resourceType' not in configuration_item or configuration_item['resourceType'] != 'AWS::IAM::Role': + return build_evaluation_from_config_item(configuration_item, 'NOT_APPLICABLE', 'Config Item type is not an IAM Role') + + role_name = configuration_item['configuration']['roleName'] + role_arn = configuration_item['configuration']['arn'] + + if "permittedRoleNames" in valid_rule_parameters and role_name in valid_rule_parameters['permittedRoleNames']: + return build_evaluation_from_config_item(configuration_item, 'COMPLIANT', 'IAM Role is on Permitted List') + + if "resourceArns" in valid_rule_parameters: + resource_arns = valid_rule_parameters['resourceArns'] + else: + resource_arns = ["*"] + + actions = valid_rule_parameters['actions'] + + iam_client = get_client('iam', event) + + # Test IAM Policies + + if resource_arns == ["*"]: + test_results = iam_client.simulate_principal_policy( + PolicySourceArn = role_arn, + ActionNames = actions) + else: + test_results = iam_client.simulate_principal_policy( + PolicySourceArn = role_arn, + ActionNames = actions, + ResourceArns = resource_arns) + + for test_result in test_results['EvaluationResults']: + if test_result['EvalDecision'] == 'allowed': + return build_evaluation_from_config_item(configuration_item, "NON_COMPLIANT", annotation='A ' + test_result['MatchedStatements'][0]['SourcePolicyType'] + ' "' + test_result['MatchedStatements'][0]['SourcePolicyId'] + '" attached to the role allows ' + test_result['EvalActionName'] + '.') + + return build_evaluation_from_config_item(configuration_item, 'COMPLIANT', annotation = 'IAM Role is compliant') + +def evaluate_parameters(rule_parameters): + """Evaluate the rule parameters dictionary validity. Raise a ValueError for invalid parameters. + Return: + anything suitable for the evaluate_compliance() + Keyword arguments: + rule_parameters -- the Key/Value dictionary of the Config Rules parameters + """ + + if 'actions' not in rule_parameters: + raise ValueError("Rule must have 'actions' set as a comma separated list of IAM actions to be tested.") + valid_rule_parameters = {} + valid_rule_parameters['actions'] = rule_parameters['actions'].split(",") + if 'permittedRoleNames' in rule_parameters: + valid_rule_parameters['permittedRoleNames'] = rule_parameters['permittedRoleNames'].split(",") + if 'resourceArns' in rule_parameters: + valid_rule_parameters['resourceArns'] = rule_parameters['resourceArns'].split(",") + + return valid_rule_parameters + +#################### +# Helper Functions # +#################### + +# Build an error to be displayed in the logs when the parameter is invalid. +def build_parameters_value_error_response(ex): + """Return an error dictionary when the evaluate_parameters() raises a ValueError. + Keyword arguments: + ex -- Exception text + """ + return build_error_response(internalErrorMessage="Parameter value is invalid", + internalErrorDetails="An ValueError was raised during the validation of the Parameter value", + customerErrorCode="InvalidParameterValueException", + customerErrorMessage=str(ex)) + +# This gets the client after assuming the Config service role +# either in the same AWS account or cross-account. +def get_client(service, event): + """Return the service boto client. It should be used instead of directly calling the client. + Keyword arguments: + service -- the service name used for calling the boto.client() + event -- the event variable given in the lambda handler + """ + if not ASSUME_ROLE_MODE: + return boto3.client(service) + credentials = get_assume_role_credentials(event["executionRoleArn"]) + return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], + aws_secret_access_key=credentials['SecretAccessKey'], + aws_session_token=credentials['SessionToken'] + ) + +# This generate an evaluation for config +def build_evaluation(resource_id, compliance_type, event, resource_type=DEFAULT_RESOURCE_TYPE, annotation=None): + """Form an evaluation as a dictionary. Usually suited to report on scheduled rules. + Keyword arguments: + resource_id -- the unique id of the resource to report + compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE + event -- the event variable given in the lambda handler + resource_type -- the CloudFormation resource type (or AWS::::Account) to report on the rule (default DEFAULT_RESOURCE_TYPE) + annotation -- an annotation to be added to the evaluation (default None) + """ + eval_cc = {} + if annotation: + eval_cc['Annotation'] = annotation + eval_cc['ComplianceResourceType'] = resource_type + eval_cc['ComplianceResourceId'] = resource_id + eval_cc['ComplianceType'] = compliance_type + eval_cc['OrderingTimestamp'] = str(json.loads(event['invokingEvent'])['notificationCreationTime']) + return eval_cc + +def build_evaluation_from_config_item(configuration_item, compliance_type, annotation=None): + """Form an evaluation as a dictionary. Usually suited to report on configuration change rules. + Keyword arguments: + configuration_item -- the configurationItem dictionary in the invokingEvent + compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE + annotation -- an annotation to be added to the evaluation (default None) + """ + eval_ci = {} + if annotation: + eval_ci['Annotation'] = annotation + eval_ci['ComplianceResourceType'] = configuration_item['resourceType'] + eval_ci['ComplianceResourceId'] = configuration_item['resourceId'] + eval_ci['ComplianceType'] = compliance_type + eval_ci['OrderingTimestamp'] = configuration_item['configurationItemCaptureTime'] + return eval_ci + +#################### +# Boilerplate Code # +#################### + +# Helper function used to validate input +def check_defined(reference, reference_name): + if not reference: + raise Exception('Error: ', reference_name, 'is not defined') + return reference + +# Check whether the message is OversizedConfigurationItemChangeNotification or not +def is_oversized_changed_notification(message_type): + check_defined(message_type, 'messageType') + return message_type == 'OversizedConfigurationItemChangeNotification' + +# Check whether the message is a ScheduledNotification or not. +def is_scheduled_notification(message_type): + check_defined(message_type, 'messageType') + return message_type == 'ScheduledNotification' + +# Get configurationItem using getResourceConfigHistory API +# in case of OversizedConfigurationItemChangeNotification +def get_configuration(resource_type, resource_id, configuration_capture_time): + result = AWS_CONFIG_CLIENT.get_resource_config_history( + resourceType=resource_type, + resourceId=resource_id, + laterTime=configuration_capture_time, + limit=1) + configurationItem = result['configurationItems'][0] + return convert_api_configuration(configurationItem) + +# Convert from the API model to the original invocation model +def convert_api_configuration(configurationItem): + for k, v in configurationItem.items(): + if isinstance(v, datetime.datetime): + configurationItem[k] = str(v) + configurationItem['awsAccountId'] = configurationItem['accountId'] + configurationItem['ARN'] = configurationItem['arn'] + configurationItem['configurationStateMd5Hash'] = configurationItem['configurationItemMD5Hash'] + configurationItem['configurationItemVersion'] = configurationItem['version'] + configurationItem['configuration'] = json.loads(configurationItem['configuration']) + if 'relationships' in configurationItem: + for i in range(len(configurationItem['relationships'])): + configurationItem['relationships'][i]['name'] = configurationItem['relationships'][i]['relationshipName'] + return configurationItem + +# Based on the type of message get the configuration item +# either from configurationItem in the invoking event +# or using the getResourceConfigHistiry API in getConfiguration function. +def get_configuration_item(invokingEvent): + check_defined(invokingEvent, 'invokingEvent') + if is_oversized_changed_notification(invokingEvent['messageType']): + configurationItemSummary = check_defined(invokingEvent['configurationItemSummary'], 'configurationItemSummary') + return get_configuration(configurationItemSummary['resourceType'], configurationItemSummary['resourceId'], configurationItemSummary['configurationItemCaptureTime']) + elif is_scheduled_notification(invokingEvent['messageType']): + return None + return check_defined(invokingEvent['configurationItem'], 'configurationItem') + +# Check whether the resource has been deleted. If it has, then the evaluation is unnecessary. +def is_applicable(configurationItem, event): + try: + check_defined(configurationItem, 'configurationItem') + check_defined(event, 'event') + except: + return True + status = configurationItem['configurationItemStatus'] + eventLeftScope = event['eventLeftScope'] + if status == 'ResourceDeleted': + print("Resource Deleted, setting Compliance Status to NOT_APPLICABLE.") + return (status == 'OK' or status == 'ResourceDiscovered') and not eventLeftScope + +def get_assume_role_credentials(role_arn): + sts_client = boto3.client('sts') + try: + assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") + return assume_role_response['Credentials'] + except botocore.exceptions.ClientError as ex: + # Scrub error message for any internal account info leaks + print(str(ex)) + if 'AccessDenied' in ex.response['Error']['Code']: + ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." + else: + ex.response['Error']['Message'] = "InternalError" + ex.response['Error']['Code'] = "InternalError" + raise ex + +# This removes older evaluation (usually useful for periodic rule not reporting on AWS::::Account). +def clean_up_old_evaluations(latest_evaluations, event): + + cleaned_evaluations = [] + + old_eval = AWS_CONFIG_CLIENT.get_compliance_details_by_config_rule( + ConfigRuleName=event['configRuleName'], + ComplianceTypes=['COMPLIANT', 'NON_COMPLIANT'], + Limit=100) + + old_eval_list = [] + + while True: + for old_result in old_eval['EvaluationResults']: + old_eval_list.append(old_result) + if 'NextToken' in old_eval: + next_token = old_eval['NextToken'] + old_eval = AWS_CONFIG_CLIENT.get_compliance_details_by_config_rule( + ConfigRuleName=event['configRuleName'], + ComplianceTypes=['COMPLIANT', 'NON_COMPLIANT'], + Limit=100, + NextToken=next_token) + else: + break + + for old_eval in old_eval_list: + old_resource_id = old_eval['EvaluationResultIdentifier']['EvaluationResultQualifier']['ResourceId'] + newer_founded = False + for latest_eval in latest_evaluations: + if old_resource_id == latest_eval['ComplianceResourceId']: + newer_founded = True + if not newer_founded: + cleaned_evaluations.append(build_evaluation(old_resource_id, "NOT_APPLICABLE", event)) + + return cleaned_evaluations + latest_evaluations + +# This decorates the lambda_handler in rule_code with the actual PutEvaluation call +def lambda_handler(event, context): + + global AWS_CONFIG_CLIENT + + check_defined(event, 'event') + #print(event) + invoking_event = json.loads(event['invokingEvent']) + #print(json.dumps(event,indent=1,sort_keys=True)) + + rule_parameters = {} + if 'ruleParameters' in event: + rule_parameters = json.loads(event['ruleParameters']) + + try: + valid_rule_parameters = evaluate_parameters(rule_parameters) + except ValueError as ex: + return build_parameters_value_error_response(ex) + + try: + AWS_CONFIG_CLIENT = get_client('config', event) + if invoking_event['messageType'] in ['ConfigurationItemChangeNotification', 'ScheduledNotification', 'OversizedConfigurationItemChangeNotification']: + configuration_item = get_configuration_item(invoking_event) + if is_applicable(configuration_item, event): + compliance_result = evaluate_compliance(event, configuration_item, valid_rule_parameters) + else: + compliance_result = "NOT_APPLICABLE" + else: + return build_internal_error_response('Unexpected message type', str(invoking_event)) + except botocore.exceptions.ClientError as ex: + if is_internal_error(ex): + return build_internal_error_response("Unexpected error while completing API request", str(ex)) + return build_error_response("Customer error while making API request", str(ex), ex.response['Error']['Code'], ex.response['Error']['Message']) + except ValueError as ex: + return build_internal_error_response(str(ex), str(ex)) + + evaluations = [] + latest_evaluations = [] + + if not compliance_result: + latest_evaluations.append(build_evaluation(event['accountId'], "NOT_APPLICABLE", event, resource_type='AWS::::Account')) + evaluations = clean_up_old_evaluations(latest_evaluations, event) + elif isinstance(compliance_result, str): + if configuration_item: + evaluations.append(build_evaluation_from_config_item(configuration_item, compliance_result)) + else: + evaluations.append(build_evaluation(event['accountId'], compliance_result, event, resource_type=DEFAULT_RESOURCE_TYPE)) + elif isinstance(compliance_result, list): + for evaluation in compliance_result: + missing_fields = False + for field in ('ComplianceResourceType', 'ComplianceResourceId', 'ComplianceType', 'OrderingTimestamp'): + if field not in evaluation: + print("Missing " + field + " from custom evaluation.") + missing_fields = True + + if not missing_fields: + latest_evaluations.append(evaluation) + evaluations = clean_up_old_evaluations(latest_evaluations, event) + elif isinstance(compliance_result, dict): + missing_fields = False + for field in ('ComplianceResourceType', 'ComplianceResourceId', 'ComplianceType', 'OrderingTimestamp'): + if field not in compliance_result: + print("Missing " + field + " from custom evaluation.") + missing_fields = True + if not missing_fields: + evaluations.append(compliance_result) + else: + evaluations.append(build_evaluation_from_config_item(configuration_item, 'NOT_APPLICABLE')) + + # Put together the request that reports the evaluation status + resultToken = event['resultToken'] + testMode = False + if resultToken == 'TESTMODE': + # Used solely for RDK test to skip actual put_evaluation API call + testMode = True + # Invoke the Config API to report the result of the evaluation + AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluations, ResultToken=resultToken, TestMode=testMode) + # Used solely for RDK test to be able to test Lambda function + return evaluations + +def is_internal_error(exception): + return ((not isinstance(exception, botocore.exceptions.ClientError)) or exception.response['Error']['Code'].startswith('5') + or 'InternalError' in exception.response['Error']['Code'] or 'ServiceError' in exception.response['Error']['Code']) + +def build_internal_error_response(internalErrorMessage, internalErrorDetails=None): + return build_error_response(internalErrorMessage, internalErrorDetails, 'InternalError', 'InternalError') + +def build_error_response(internalErrorMessage, internalErrorDetails=None, customerErrorCode=None, customerErrorMessage=None): + error_response = { + 'internalErrorMessage': internalErrorMessage, + 'internalErrorDetails': internalErrorDetails, + 'customerErrorMessage': customerErrorMessage, + 'customerErrorCode': customerErrorCode + } + print(error_response) + return error_response diff --git a/python/IAM_TEST_ACTIONS/parameter.json b/python/IAM_TEST_ACTIONS/parameter.json new file mode 100644 index 00000000..baf52568 --- /dev/null +++ b/python/IAM_TEST_ACTIONS/parameter.json @@ -0,0 +1,15 @@ +{ + "Version": "1.0", + "Parameters": { + "CodeKey": "IAM_TEST_ACTIONS.zip", + "SourceRuntime": "python3.7", + "RuleName": "IAM_TEST_ACTIONS", + "OptionalParameters": "{permittedRoleNames,resourceArns}", + "InputParameters": "{actions}", + "SourceEvents": "AWS::IAM::Role", + "RuleSets": [ + "iam" + ] + }, + "Tags": "[]" + } \ No newline at end of file