-
Notifications
You must be signed in to change notification settings - Fork 38
/
throttled_indexer.py
82 lines (72 loc) · 2.66 KB
/
throttled_indexer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from typing import Mapping
from infra.default_lambda import RivDefaultFunction
from infra.storage.topology import RivSharedDataStores
from infra.interfaces import IVpcRivStack
import aws_cdk as core
from constructs import Construct
from aws_cdk import (
aws_lambda_event_sources as events,
aws_sqs as sqs,
aws_iam as iam,
)
class RivBulkLoaderThrottledIndexer(RivDefaultFunction):
'''
Represents a function that reads from SQS and writes into RiV.
'''
@property
def source_directory(self)->str:
return 'src/bulk-loader/throttled-indexer'
@property
def component_name(self)->str:
return 'ThrottledIndexer'
@property
def function_timeout(self)->core.Duration:
return core.Duration.minutes(5)
@property
def function_name(self) -> str:
return '{}-BulkLoading-{}'.format(
self.riv_stack.riv_stack_name,
self.component_name)
def __init__(self, scope: Construct, id: str, riv_stack:IVpcRivStack, sharedStorage:RivSharedDataStores, subnet_group_name:str='Default', env:Mapping[str,str]={}, **kwargs) -> None:
super().__init__(scope, id, **kwargs, riv_stack=riv_stack, subnet_group_name=subnet_group_name, env=env)
'''
Configure the Input Queue with redrive policy into a DLQ
'''
self.dead_letter_queue = sqs.Queue(self,'DeadLetterQueue')
self.input_queue = sqs.Queue(self,'InputQueue',
retention_period= core.Duration.days(7),
visibility_timeout= self.function_timeout,
dead_letter_queue=sqs.DeadLetterQueue(
max_receive_count=3,
queue=self.dead_letter_queue))
'''
Configure the lambda to trigger from the queue.
'''
self.function.add_event_source(events.SqsEventSource(
queue= self.input_queue,
batch_size = 1))
'''
Grant additional permissions on the image bucket...
'''
#sharedStorage.images.image_bucket.grant_read(self.function.role)
self.function.role.add_to_policy(statement=iam.PolicyStatement(
effect= iam.Effect.ALLOW,
actions=[
's3:GetObject*',
's3:GetBucket*',
's3:List*',
's3:PutObjectTagging',
],
resources=[
sharedStorage.images.image_bucket.bucket_arn,
sharedStorage.images.image_bucket.bucket_arn+'/*'
]))
'''
Grant read access to the SSM Parameters...
'''
self.function.role.add_to_policy(statement=iam.PolicyStatement(
effect= iam.Effect.ALLOW,
actions=['ssm:GetParameter*'],
resources=['arn:aws:ssm:{}:{}:parameter/riv/{}/userportal/url'.format(
core.Stack.of(self).region, core.Aws.ACCOUNT_ID, riv_stack.riv_stack_name)]
))