Skip to content

Commit

Permalink
Import cloudwatch logs resource policies (#8)
Browse files Browse the repository at this point in the history
* Add resource policies to log groups

* Move mapper functions to their own file
  • Loading branch information
gsoltis authored Mar 7, 2021
1 parent 526bf2d commit 2e3775e
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 189 deletions.
94 changes: 92 additions & 2 deletions introspector/aws/logs.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
from collections import defaultdict
import json
import logging
from typing import Any, Dict, Iterator, Tuple
from typing import Any, Dict, List, Iterator, Iterable, Tuple

from introspector.aws.fetch import ServiceProxy
from sqlalchemy.orm import Session

from introspector import ImportWriter, PathStack
from introspector.aws.fetch import Proxy, ServiceProxy
from introspector.aws.mapper_fns import policy_statement
from introspector.aws.region import RegionCache
from introspector.aws.svc import RegionalService, ServiceSpec, resource_gate
from introspector.models import ImportJob, Resource

_log = logging.getLogger(__name__)

Expand All @@ -26,6 +34,88 @@ def _import_log_groups(proxy: ServiceProxy):
yield 'LogGroup', _import_log_group(proxy, group_data)


def normalize_resource_policies(policies: List) -> Dict[str, List[Any]]:
resource_statements_map: Dict[str, List[Any]] = {}
for policy_data in policies:
name = policy_data.get('policyName', 'unnamed')
document = json.loads(policy_data.get('policyDocument', '{}'))
for statement in document.get('Statement', []):
statement_common = policy_statement(statement)
other_keys = [
key for key in statement_common.keys()
if key not in ('Resource', 'Sid')
]
sid = statement_common.get('Sid', 'nosid')
for resource in statement_common.get('Resource', []):
to_add = {
key: value
for key, value in statement_common.items() if key in other_keys
}
to_add['Resource'] = [resource]
to_add['Sid'] = '_'.join([name, sid])
resource_statements = resource_statements_map.get(resource, [])
resource_statements.append(to_add)
resource_statements_map[resource] = resource_statements
return resource_statements_map


def _import_resource_policies(proxy: ServiceProxy) -> Dict[str, List[Any]]:
policies_resp = proxy.list('describe_resource_policies')
if policies_resp is not None:
policies = policies_resp[1].get('resourcePolicies', [])
return normalize_resource_policies(policies)
return {}


def _log_group_uris_by_prefix(db: Session, provider_account_id: int,
account_id: str, region: str, prefix: str) -> Iterable[str]:
prefix_with_wildcards = prefix.replace('*', '%')
# if the prefix has a spot for an account id, fill it in
parts = prefix_with_wildcards.split(':')
if len(parts) < 5:
prefix_parts = ['arn', 'aws', 'logs', region, account_id, '%']
for incoming, expected in zip(parts, prefix_parts):
if incoming != expected and incoming != '%':
return []
# TODO: partition
parts = ['arn', 'aws', 'logs', region, account_id, '%']
else:
parts[4] = account_id
# Handle the case where the resource is 'arn:aws:logs:region:*'
if len(parts) == 5:
parts.append('%')
resolved_prefix = ':'.join(parts)
return map(lambda row: row[0], db.query(Resource.uri).filter(
Resource.provider_account_id == provider_account_id,
Resource.service == 'logs',
Resource.provider_type == 'LogGroup',
Resource.uri.like(resolved_prefix)).all())


def _make_policy(statements):
return {'Version': '2012-10-17', 'Statement': statements}


def add_logs_resource_policies(db: Session, proxy: Proxy,
region_cache: RegionCache, writer: ImportWriter,
import_job: ImportJob, ps: PathStack,
account_id: str):
for region in region_cache.regions_for_service('logs'):
logs_proxy = proxy.service('logs', region)
policies = _import_resource_policies(logs_proxy)
synthesized = defaultdict(lambda: [])
for prefix, statements in policies.items():
for log_group_uri in _log_group_uris_by_prefix(
db, import_job.provider_account_id, account_id, region, prefix):
synthesized[log_group_uri] += statements
for uri, statements in synthesized.items():
policy = _make_policy(statements)
writer(ps, 'ResourcePolicy', {
'Policy': policy,
'arn': uri
}, {'region': region})


def _import_logs_region(proxy: ServiceProxy, region: str,
spec: ServiceSpec) -> Iterator[Tuple[str, Any]]:
if resource_gate(spec, 'LogGroup'):
Expand Down
224 changes: 38 additions & 186 deletions introspector/aws/map.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import logging
import os
import re
from typing import Any, Dict, List, Optional, Union

from sqlalchemy.orm import Session

Expand All @@ -10,7 +8,10 @@
from introspector.aws.ec2_adjunct import find_adjunct_data
from introspector.aws.fetch import Proxy
from introspector.aws.iam import synthesize_account_root
from introspector.aws.logs import add_logs_resource_policies
from introspector.aws.mapper_fns import get_mapper_fns
from introspector.aws.svc import ImportSpec, resource_gate, service_gate
from introspector.aws.region import RegionCache
from introspector.aws.uri import get_arn_fn
from introspector.delta.partial import map_partial_deletes, map_partial_prefix
from introspector.delta.resource import map_relation_deletes, map_resource_deletes, map_resource_prefix, map_resource_relations
Expand All @@ -20,172 +21,6 @@

_log = logging.getLogger(__name__)

def _zone_to_region(zone: str, **_) -> str:
return zone[:-1]


_KEY_ATTRS = ['Key', 'key', 'TagKey']


def _aws_tag_key(item: Dict) -> str:
for attr in _KEY_ATTRS:
key = item.get(attr)
if key is not None:
return key
raise GFError(f'Cannot find tag key in {item}')


_VALUE_ATTRS = ['Value', 'value', 'TagValue']


def _aws_tag_value(item: Dict) -> str:
for attr in _VALUE_ATTRS:
value = item.get(attr)
if value is not None:
return value
raise GFError(f'Cannot find tag value in {item}')


def _tag_list_to_object(tags: Optional[List[Dict[str, str]]],
**_) -> Dict[str, str]:
if tags is None or len(tags) == 0:
return {}
return {_aws_tag_key(item): _aws_tag_value(item) for item in tags}


def _lambda_alias_relations(parent_uri, target_raw, **kwargs):
initial_version = target_raw['FunctionVersion']
fn_arn = target_raw['FunctionArn']

def version_arn(v: str) -> str:
return f'{fn_arn}:{v}'

version_total = 0
weights = target_raw.get('RoutingConfig', {}).get('AdditionalVersionWeights',
{})
for version, weight in weights.items():
version_total += weight
target_uri = version_arn(version)
yield parent_uri, 'forwards-to', version_arn(version), [{
'name': 'weight',
'value': weight
}]
remaining = 1.0 - version_total
target_uri = version_arn(initial_version)
yield parent_uri, 'forwards-to', version_arn(initial_version), [{
'name':
'weight',
'value':
remaining
}]

def _arrayize(inval: Union[str, List[str]]) -> List[str]:
if isinstance(inval, str):
return [inval]
return sorted(inval)

ALL_DIGITS = re.compile(r'^[0-9]{10}[0-9]*$')
def _normalize_principal_map(raw: Union[str, Dict[str, Any]]) -> Dict[str, List[Any]]:
result = {}
if not isinstance(raw, dict):
if raw != '*':
_log.warn(f'Invalid string literal principal {raw}')
return {}
else:
return {'AWS': ['*']}
for key, value in raw.items():
values = _arrayize(value)
if key == 'AWS':
# normalize account ids
principals = []
for principal in values:
if ':' not in principal and ALL_DIGITS.match(principal) is not None:
principals.append(f'arn:aws:iam::{principal}:root')
else:
principals.append(principal)
values = principals
result[key] = values
return result

EFFECTS = {
'allow': 'Allow',
'deny': 'Deny'
}
def _policy_statement(raw: Dict[str, Any]) -> Dict[str, Any]:
result = {}
lc = {k.lower(): v for k, v in raw.items()}
def _normalize(s: str, fn):
val = lc.get(s.lower())
if val is not None:
result[s] = fn(val)
sid = lc.get('sid')
if sid is not None:
result['Sid'] = sid
_normalize('Principal', _normalize_principal_map)
_normalize('NotPrincipal', _normalize_principal_map)
result['Effect'] = EFFECTS[lc['effect'].lower()]
_normalize('Action', _arrayize)
_normalize('NotAction', _arrayize)
_normalize('Resource', _arrayize)
_normalize('NotResource', _arrayize)
condition = lc.get('condition')
if condition is not None:
# TODO: deeper normalization
result['Condition'] = condition
return result


EMPTY_POLICY = {
'Version': '2012-10-17',
'Statement': []
}
def _policy(policy: Optional[Dict[str, Any]]) -> Dict[str, Any]:
if policy is None:
return EMPTY_POLICY
result = {}
lc = {k.lower(): v for k, v in policy.items()}
result['Version'] = lc.get('version', '2012-10-17')
policy_id = lc.get('id')
if policy_id is not None:
result['Id'] = policy_id
result['Statement'] = [_policy_statement(s) for s in lc.get('statement', [])]
return result

def _policy_map(m: Optional[Dict[str, Dict[str, Any]]]) -> Dict[str, Any]:
if m is None or len(m) == 0:
return EMPTY_POLICY
policies = [_policy(policy) for policy in m.values()]
statements = []
for policy in policies:
statements += policy.get('Statement', [])
return {
'Version': '2012-10-17',
'Id': 'Synthesized from map',
'Statement': statements
}

AWS_TRANSFORMS = {
'aws_zone_to_region': _zone_to_region,
'aws_tags': _tag_list_to_object,
'aws_lambda_alias': _lambda_alias_relations,
'aws_policy': _policy,
'aws_policy_map': _policy_map
}


def _get_aws_not_in_org(import_job: ImportJob):
# TODO: rewrite this. pretty sure we have the list of accounts
account_paths = import_job.configuration['aws_graph']['accounts']

def _aws_not_in_org(account_id: str, **kwargs) -> bool:
for accounts in account_paths.values():
for account in accounts:
if account['Id'] == account_id:
return False
return True

return _aws_not_in_org


class AWSDivisionURI(DivisionURI):
def __init__(self,
Expand Down Expand Up @@ -234,11 +69,13 @@ def _get_mapper(import_job: ImportJob,
org_config['Id'])
transform_path = os.path.join(os.path.dirname(__file__), 'transforms')
transforms = load_transforms(transform_path)
fns = AWS_TRANSFORMS.copy()
fns['aws_not_in_org'] = _get_aws_not_in_org(import_job)
if extra_fns is not None:
for fn, impl in extra_fns.items():
fns[fn] = impl
account_paths = import_job.configuration['aws_graph']['accounts']
account_ids = []
for accounts in account_paths.values():
for account in accounts:
account_ids.append(account['Id'])
fns = get_mapper_fns(account_ids, extra_fns)

return Mapper(transforms,
import_job.provider_account_id,
division_uri,
Expand All @@ -247,7 +84,7 @@ def _get_mapper(import_job: ImportJob,


# Everything has a 'base' source, these are extra
AWS_SOURCES = ['credentialreport']
AWS_SOURCES = ['credentialreport', 'logspolicies']


def map_import(db: Session, import_job_id: int, partition: str,
Expand All @@ -257,12 +94,6 @@ def map_import(db: Session, import_job_id: int, partition: str,
raise GFInternal('Lost ImportJob')
ps = PathStack.from_import_job(import_job)
mapper = _get_mapper(import_job)
adjunct_writer = db_import_writer(db,
import_job.id,
import_job.provider_account_id,
'ec2',
phase=1,
source='base')
gate = service_gate(spec)
for path, account in account_paths_for_import(db, import_job):
uri_fn = get_arn_fn(account.scope, partition)
Expand All @@ -272,20 +103,41 @@ def map_import(db: Session, import_job_id: int, partition: str,
if gate('iam') is not None:
boto = load_boto_session(account)
proxy = Proxy.build(boto)
synthesize_account_root(proxy, db, import_job, import_job.path_prefix, account.scope,
partition)
for source in AWS_SOURCES:
map_partial_prefix(db, mapper, import_job, source,
import_job.path_prefix, uri_fn)
map_partial_deletes(db, import_job, source, spec)
synthesize_account_root(proxy, db, import_job, import_job.path_prefix,
account.scope, partition)
ec2_spec = gate('ec2')
if ec2_spec is not None and resource_gate(ec2_spec, 'Images'):
# Additional ec2 work
if boto is None or proxy is None:
boto = load_boto_session(account)
proxy = Proxy.build(boto)
adjunct_writer = db_import_writer(db,
import_job.id,
import_job.provider_account_id,
'ec2',
phase=1,
source='base')
find_adjunct_data(db, proxy, adjunct_writer, import_job, ps, import_job)

logs_spec = gate('logs')
if logs_spec is not None and resource_gate(logs_spec, 'ResourcePolicies'):
if boto is None or proxy is None:
boto = load_boto_session(account)
proxy = Proxy.build(boto)
region_cache = RegionCache(boto, partition)
adjunct_writer = db_import_writer(db,
import_job.id,
import_job.provider_account_id,
'logs',
phase=1,
source='logspolicies')
add_logs_resource_policies(db, proxy, region_cache, adjunct_writer,
import_job, ps, account.scope)

for source in AWS_SOURCES:
map_partial_prefix(db, mapper, import_job, source,
import_job.path_prefix, uri_fn)
map_partial_deletes(db, import_job, source, spec)
# Re-map anything we've added
map_resource_prefix(db, import_job, import_job.path_prefix, mapper, uri_fn)

Expand Down
Loading

0 comments on commit 2e3775e

Please sign in to comment.