-
Notifications
You must be signed in to change notification settings - Fork 38
/
topology.py
89 lines (80 loc) · 3.63 KB
/
topology.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import builtins
from infra.bulkloader.batch_handler import RivBulkLoaderBatchHandler
from infra.bulkloader.throttled_indexer import RivBulkLoaderThrottledIndexer
from infra.storage.topology import RivSharedDataStores
from infra.bulkloader.inventory_created import RivBulkLoaderInventoryCreatedHandler
from infra.interfaces import IVpcRivStack
from constructs import Construct
from aws_cdk import (
aws_iam as iam,
#aws_dynamodb as ddb,
#aws_ssm as ssm,
)
class RivBulkLoader(Construct):
'''
Represents the root construct for the Bulk Loader Service.
'''
def __init__(self, scope: Construct, id: builtins.str, riv_stack:IVpcRivStack, sharedStorage:RivSharedDataStores, subnet_group_name:str='Default', **kwargs) -> None:
super().__init__(scope, id)
'''
Configure the Amazon S3 Batch Service role.
'''
self.batch_service_role = iam.Role(self,'BatchServiceRole',
assumed_by= iam.ServicePrincipal(service='batchoperations.s3.amazonaws.com'))
sharedStorage.images.image_bucket.grant_read(self.batch_service_role)
sharedStorage.images.inventory_bucket.grant_read_write(self.batch_service_role)
# '''
# Configure this Import History table.
# '''
# self.import_history_table = ddb.Table(self,'ImportTable',
# billing_mode= ddb.BillingMode.PAY_PER_REQUEST,
# removal_policy= core.RemovalPolicy.DESTROY,
# partition_key= ddb.Attribute(
# name='PartitionKey',
# type=ddb.AttributeType.STRING),
# sort_key=ddb.Attribute(
# name='SortKey',
# type=ddb.AttributeType.STRING),
# point_in_time_recovery=True)
'''
The batch job will determine which images qualify for processing.
Only applicable items are put into an SQS queue that throttles data loading speeds.
'''
self.throttled_indexer = RivBulkLoaderThrottledIndexer(self,'BatchIndexer',
riv_stack=riv_stack,
sharedStorage=sharedStorage,
subnet_group_name=subnet_group_name,
env={
# 'IMPORT_TABLE_NAME': self.import_history_table.table_name,
'RIV_STACK_NAME': riv_stack.riv_stack_name,
'USER_PORTAL_PARAM': '/riv/{}/userportal/url'.format(
riv_stack.riv_stack_name),
})
'''
S3 Batch iterates through the inventory list and passes the items to a lambda.
This lambda will determine if the S3 object (aka the image); qualifies for RIV indexing.
Reasons for skipping images include: already processed, incomplete information, etc.
'''
self.batch_handler = RivBulkLoaderBatchHandler(self,'BatchHandler',
riv_stack=riv_stack,
sharedStorage=sharedStorage,
subnet_group_name=subnet_group_name,
env={
'THROTTLED_QUEUE_URL': self.throttled_indexer.input_queue.queue_url,
# 'IMPORT_TABLE_NAME': self.import_history_table.table_name,
})
self.batch_handler.function.grant_invoke(self.batch_service_role)
'''
When the S3 inventory completes it raises an ObjectCreatedNotification in the inventory bucket.
This message forwards to an SNS Topic then into this function. After light-filtering and creates the S3 Batch job.
'''
self.inventory_created_handler = RivBulkLoaderInventoryCreatedHandler(self,'InventoryCreatedHandler',
riv_stack=riv_stack,
sharedStorage=sharedStorage,
subnet_group_name=subnet_group_name,
env={
'ACCOUNT_ID': core.Stack.of(self).account,
'BATCH_FUNCTION_ARN': self.batch_handler.function.function_arn,
'BATCH_ROLE_ARN': self.batch_service_role.role_arn,
'RIV_STACK_NAME': riv_stack.riv_stack_name,
})