Skip to content

Commit 2d321d2

Browse files
Added NAT GATEWAY and INTERNET GATEWAY checks #7
1 parent 29898e3 commit 2d321d2

File tree

2 files changed

+84
-1
lines changed

2 files changed

+84
-1
lines changed

shared/awscommands.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from shared.common import *
22
from shared.internal.security import IAM, IAMPOLICY
3-
from shared.internal.network import VPC
3+
from shared.internal.network import VPC, IGW, NATGATEWAY
44
from shared.internal.compute import LAMBDA, EC2
55
from shared.internal.database import RDS, ELASTICACHE, DOCUMENTDB
66
from shared.internal.storage import EFS, S3POLICY
@@ -27,3 +27,5 @@ def run(self):
2727
DOCUMENTDB(self.vpc_options).run()
2828
SQSPOLICY(self.vpc_options).run()
2929
MSK(self.vpc_options).run()
30+
IGW(self.vpc_options).run()
31+
NATGATEWAY(self.vpc_options).run()

shared/internal/network.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,84 @@ def run(self):
2121
except Exception as e:
2222
message = "There is no VpcID \"{0}\" in region {1}.\nError {2}".format(self.vpc_options.vpc_id, self.vpc_options.region_name, str(e))
2323
exit_critical(message)
24+
25+
class INTERNETGATEWAY(object):
26+
27+
def __init__(self, vpc_options: VpcOptions):
28+
self.vpc_options = vpc_options
29+
30+
def run(self):
31+
32+
try:
33+
client = self.vpc_options.client('ec2')
34+
35+
filters = [{'Name': 'attachment.vpc-id',
36+
'Values': [self.vpc_options.vpc_id]}]
37+
38+
response = client.describe_internet_gateways(Filters=filters)
39+
40+
message_handler("\nChecking INTERNET GATEWAYS...", "HEADER")
41+
42+
""" One VPC has only 1 IGW then it's a direct check """
43+
if len(response["InternetGateways"]) == 0:
44+
message_handler("Found 0 Internet Gateway in region {0}".format(self.vpc_options.region_name), "OKBLUE")
45+
else:
46+
47+
found = 1
48+
49+
message = "\nInternetGatewayId: {} -> VPC id {}".format(
50+
response['InternetGateways'][0]['InternetGatewayId'],
51+
self.vpc_options.vpc_id
52+
)
53+
54+
message_handler("Found {0} Internet Gateway using VPC {1} {2}".format(str(found), \
55+
self.vpc_options.vpc_id, message), \
56+
'OKBLUE')
57+
58+
except Exception as e:
59+
message = "Can't list Internet Gateway\nError {0}".format(str(e))
60+
exit_critical(message)
61+
62+
63+
class NATGATEWAY(object):
64+
65+
def __init__(self, vpc_options: VpcOptions):
66+
self.vpc_options = vpc_options
67+
68+
def run(self):
69+
70+
try:
71+
client = self.vpc_options.client('ec2')
72+
73+
filters = [{'Name': 'vpc-id',
74+
'Values': [self.vpc_options.vpc_id]}]
75+
76+
response = client.describe_nat_gateways(Filters=filters)
77+
78+
message_handler("\nChecking NAT GATEWAYS...", "HEADER")
79+
80+
if len(response["NatGateways"]) == 0:
81+
message_handler("Found 0 NAT Gateways in region {0}".format(self.vpc_options.region_name), "OKBLUE")
82+
else:
83+
84+
found = 0
85+
message = ""
86+
87+
for data in response["NatGateways"]:
88+
89+
if data['VpcId'] == self.vpc_options.vpc_id:
90+
91+
found += 1
92+
message = message + "\nNatGatewayId: {} -> VPC id {}".format(
93+
data['NatGatewayId'],
94+
self.vpc_options.vpc_id
95+
)
96+
97+
message_handler("Found {0} NAT Gateways using VPC {1} {2}".format(str(found), self.vpc_options.vpc_id, message),'OKBLUE')
98+
99+
except Exception as e:
100+
message = "Can't list NAT Gateways\nError {0}".format(str(e))
101+
exit_critical(message)
102+
103+
""" alias """
104+
IGW = INTERNETGATEWAY

0 commit comments

Comments
 (0)