1
+ from shared .common import *
2
+ import json
3
+
4
+ class SQSPOLICY (object ):
5
+
6
+ def __init__ (self , vpc_options : VpcOptions ):
7
+ self .vpc_options = vpc_options
8
+
9
+ def run (self ):
10
+ try :
11
+ client = self .vpc_options .session .client ('sqs' , region_name = self .vpc_options .region_name )
12
+
13
+ response = client .list_queues ()
14
+
15
+ message_handler ("\n Checking SQS QUEUE POLICY..." , "HEADER" )
16
+
17
+ if not "QueueUrls" in response :
18
+ message_handler ("Found 0 SQS Queues in region {0}" .format (self .vpc_options .region_name ), "OKBLUE" )
19
+ else :
20
+ found = 0
21
+ message = ""
22
+
23
+ """ SQS Queue doesn't returns a dict"""
24
+ for idx , queue in enumerate (response ["QueueUrls" ]):
25
+
26
+ sqs_queue_policy = client .get_queue_attributes (QueueUrl = queue ,
27
+ AttributeNames = ['Policy' ])
28
+
29
+
30
+ if "Attributes" in sqs_queue_policy :
31
+
32
+ """ Not sure about boto3 return """
33
+ try :
34
+ documentpolicy = sqs_queue_policy ['Attributes' ]['Policy' ]
35
+
36
+ document = json .dumps (documentpolicy , default = datetime_to_string )
37
+
38
+ if self .vpc_options .vpc_id in document :
39
+ found += 1
40
+ message = message + "\n QueueUrl: {0} - VpcId {1}" .format (
41
+ queue ,
42
+ self .vpc_options .vpc_id
43
+ )
44
+ except :
45
+ pass
46
+
47
+
48
+ message_handler ("Found {0} SQS Queue Policy using VPC {1} {2}" .format (str (found ), self .vpc_options .vpc_id , message ),'OKBLUE' )
49
+
50
+ except Exception as e :
51
+ message = "Can't list SQS Queue Policy\n Error {0}" .format (str (e ))
52
+ exit_critical (message )
0 commit comments