1
1
from typing import NamedTuple
2
2
import datetime
3
3
import boto3
4
+ import re
5
+ from ipaddress import ip_network , ip_address
6
+
7
+
8
+ VPCE_REGEX = re .compile (r'(?<=sourcevpce")(\s*:\s*")(vpce-[a-zA-Z0-9]+)' , re .DOTALL )
9
+ SOURCE_IP_ADDRESS_REGEX = re .compile (r'(?<=sourceip")(\s*:\s*")([a-fA-F0-9.:/%]+)' , re .DOTALL )
4
10
5
11
6
12
class bcolors :
@@ -19,6 +25,9 @@ class VpcOptions(NamedTuple):
19
25
vpc_id : str
20
26
region_name : str
21
27
28
+ def client (self , service_name : str ):
29
+ return self .session .client (service_name , region_name = self .region_name )
30
+
22
31
23
32
def generate_session (profile_name ):
24
33
try :
@@ -29,14 +38,83 @@ def generate_session(profile_name):
29
38
30
39
31
40
def exit_critical (message ):
32
- print ( bcolors . colors . get ( 'FAIL' ), message , bcolors . colors . get ( 'ENDC' ), sep = "" )
41
+ log_critical ( message )
33
42
raise SystemExit
34
43
35
44
45
+ def log_critical (message ):
46
+ print (bcolors .colors .get ('FAIL' ), message , bcolors .colors .get ('ENDC' ), sep = "" )
47
+
48
+
36
49
def message_handler (message , position ):
37
50
print (bcolors .colors .get (position ), message , bcolors .colors .get ('ENDC' ), sep = "" )
38
51
39
52
40
53
def datetime_to_string (o ):
41
54
if isinstance (o , datetime .datetime ):
42
55
return o .__str__ ()
56
+
57
+
58
+ def check_ipvpc_inpolicy (document , vpc_options : VpcOptions ):
59
+ document = document .replace ("\\ " , "" ).lower ()
60
+
61
+ """ Checking if VPC is inside document, it's a 100% true information """
62
+ if vpc_options .vpc_id in document :
63
+ return "direct VPC reference"
64
+ else :
65
+ """
66
+ Vpc_id not found, trying to discover if it's a potencial subnet IP or VPCE is allowed
67
+ """
68
+ if "aws:sourcevpce" in document :
69
+
70
+ """ Get VPCE found """
71
+ aws_sourcevpces = []
72
+ for vpce_tuple in VPCE_REGEX .findall (document ):
73
+ aws_sourcevpces .append (vpce_tuple [1 ])
74
+
75
+ """ Get all VPCE of this VPC """
76
+ ec2 = vpc_options .client ('ec2' )
77
+
78
+ filters = [{'Name' : 'vpc-id' ,
79
+ 'Values' : [vpc_options .vpc_id ]}]
80
+
81
+ vpc_endpoints = ec2 .describe_vpc_endpoints (Filters = filters )
82
+
83
+ """ iterate VPCEs found found """
84
+ if len (vpc_endpoints ['VpcEndpoints' ]) > 0 :
85
+ matching_vpces = []
86
+ """ Iterate VPCE to match vpce in Policy Document """
87
+ for data in vpc_endpoints ['VpcEndpoints' ]:
88
+ if data ['VpcEndpointId' ] in aws_sourcevpces :
89
+ matching_vpces .append (data ['VpcEndpointId' ])
90
+ return "VPC Endpoint(s): " + (", " .join (matching_vpces ))
91
+
92
+ if "aws:sourceip" in document :
93
+
94
+ """ Get ip found """
95
+ aws_sourceips = []
96
+ for vpce_tuple in SOURCE_IP_ADDRESS_REGEX .findall (document ):
97
+ aws_sourceips .append (vpce_tuple [1 ])
98
+ """ Get subnets cidr block """
99
+ ec2 = vpc_options .client ('ec2' )
100
+
101
+ filters = [{'Name' : 'vpc-id' ,
102
+ 'Values' : [vpc_options .vpc_id ]}]
103
+
104
+ subnets = ec2 .describe_subnets (Filters = filters )
105
+ overlapping_subnets = []
106
+ """ iterate ips found """
107
+ for ipfound in aws_sourceips :
108
+
109
+ """ Iterate subnets to match ipaddress """
110
+ for subnet in list (subnets ['Subnets' ]):
111
+ ipfound = ip_network (ipfound )
112
+ network_addres = ip_network (subnet ['CidrBlock' ])
113
+
114
+ if ipfound .overlaps (network_addres ):
115
+ overlapping_subnets .append ("{} ({})" .format (str (network_addres ), subnet ['SubnetId' ]))
116
+ if len (overlapping_subnets ) != 0 :
117
+ return "source IP(s): {} -> subnet CIDR(s): {}" \
118
+ .format (", " .join (aws_sourceips ), ", " .join (overlapping_subnets ))
119
+
120
+ return False
0 commit comments