Skip to content

Commit 98b403d

Browse files
author
Milo Hyben
committed
Rename post to extract, implement Pub/Sub functionality in the ETL driver.
1 parent f34c8b6 commit 98b403d

File tree

5 files changed

+210
-19
lines changed

5 files changed

+210
-19
lines changed
File renamed without changes.

etl/post/main.py renamed to etl/extract/main.py

+11-7
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020

2121
@functions_framework.http
22-
def etl_post(request: flask.Request):
22+
def etl_extract(request: flask.Request):
2323
"""HTTP Cloud Function.
2424
Args:
2525
request (flask.Request): The request object.
@@ -59,12 +59,13 @@ def etl_post(request: flask.Request):
5959
'request_id': request_id,
6060
'timestamp': datetime.datetime.utcnow().isoformat(),
6161
'type': request.path,
62-
'submitting_user': email_from_id_token(auth.token),
63-
'body': jbody_str,
62+
'submitting_user': email_from_id_token(auth.token)
6463
}
6564

6665
# throw an exception if one occurs
67-
errors = _BQ_CLIENT.insert_rows_json(BIGQUERY_TABLE, [bq_obj])
66+
errors = _BQ_CLIENT.insert_rows_json(
67+
BIGQUERY_TABLE, [bq_obj | {'body': jbody_str}]
68+
)
6869
if errors:
6970
return {
7071
'success': False,
@@ -73,10 +74,13 @@ def etl_post(request: flask.Request):
7374
}, 500
7475

7576
# publish to pubsub
76-
# message contains all the attributes except body which can be large and already stored in BQ table
77-
pb_obj = {k: v for k, v in bq_obj.items() if k not in ['body']}
77+
# message contains all the attributes except body which can be large
78+
# and already stored in BQ table
7879
try:
79-
_PUBSUB_CLIENT.publish(PUBSUB_TOPIC, json.dumps(pb_obj).encode(), content_type='application/json')
80+
_PUBSUB_CLIENT.publish(
81+
PUBSUB_TOPIC,
82+
json.dumps(bq_obj).encode()
83+
)
8084
except Exception as e: # pylint: disable=broad-exception-caught
8185
logging.error(f'Failed to publish to pubsub: {e}')
8286

File renamed without changes.

etl/load/main.py

+8
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,24 @@ def etl_load(request: flask.Request):
3131
"request_id": "70eb6292-6311-44cf-9c9b-2b38bb076699"
3232
}
3333
34+
At the moment pulumi does not support unwrapping of push messages:
35+
https://github.com/pulumi/pulumi-gcp/issues/1142
36+
37+
We need to support both
3438
"""
3539

3640
auth = request.authorization
3741
if not auth or not auth.token:
3842
return {'success': False, 'message': 'No auth token provided'}, 401
3943

44+
logging.info(f'auth {auth}')
45+
4046
# if mimetype might not be set esp. when PubSub pushing from another topic,
4147
# try to force conversion and if fails just return None
4248
jbody = request.get_json(force=True, silent=True)
4349

50+
logging.info(f'jbody: {jbody}')
51+
4452
if callable(jbody):
4553
# request.json is it in reality, but the type checker is saying it's callable
4654
jbody = jbody()

metamist_infrastructure/driver.py

+191-12
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414
from cpg_infra.utils import archive_folder
1515

1616
# this gets moved around during the pip install
17-
ETL_FOLDER = Path(__file__).parent / 'etl'
17+
# ETL_FOLDER = Path(__file__).parent / 'etl'
18+
ETL_FOLDER = Path(__file__).parent.parent / 'etl'
1819
PATH_TO_ETL_BQ_SCHEMA = ETL_FOLDER / 'bq_schema.json'
19-
PATH_TO_ETL_ENDPOINT = ETL_FOLDER / 'endpoint'
20+
# PATH_TO_ETL_EXTRACT = ETL_FOLDER / 'extract'
21+
# PATH_TO_ETL_ENDPOINT = ETL_FOLDER / 'load'
2022

2123

2224
class MetamistInfrastructure(CpgInfrastructurePlugin):
@@ -103,7 +105,7 @@ def source_bucket(self):
103105
in a Google Cloud Storage bucket.
104106
"""
105107
return gcp.storage.Bucket(
106-
f'metamist-source-bucket',
108+
'metamist-source-bucket',
107109
name=f'{self.config.gcp.dataset_storage_prefix}metamist-source-bucket',
108110
location=self.config.gcp.region,
109111
project=self.config.sample_metadata.gcp.project,
@@ -143,11 +145,97 @@ def etl_pubsub_topic(self):
143145
Pubsub topic to trigger the etl function
144146
"""
145147
return gcp.pubsub.Topic(
146-
f'metamist-etl-topic',
148+
'metamist-etl-topic',
147149
project=self.config.sample_metadata.gcp.project,
148150
opts=pulumi.ResourceOptions(depends_on=[self._svc_pubsub]),
149151
)
150152

153+
@cached_property
154+
def etl_pubsub_dead_letters_topic(self):
155+
"""
156+
Pubsub dead_letters topic to capture failed jobs
157+
"""
158+
topic = gcp.pubsub.Topic(
159+
'metamist-etl-dead-letters-topic',
160+
project=self.config.sample_metadata.gcp.project,
161+
opts=pulumi.ResourceOptions(depends_on=[self._svc_pubsub]),
162+
)
163+
164+
# give publisher permission to service account
165+
gcp.pubsub.TopicIAMPolicy(
166+
'metamist-etl-dead-letters-topic-iam-policy',
167+
project=self.config.sample_metadata.gcp.project,
168+
topic=topic.name,
169+
policy_data=self.prepare_service_account_policy_data(
170+
'roles/pubsub.publisher'
171+
)
172+
)
173+
174+
return topic
175+
176+
@cached_property
177+
def etl_pubsub_push_subscription(self):
178+
"""
179+
Pubsub push_subscription to topic, new messages to topic triggeres load process
180+
"""
181+
subscription = gcp.pubsub.Subscription(
182+
'metamist-etl-subscription',
183+
topic=self.etl_pubsub_topic.name,
184+
ack_deadline_seconds=20,
185+
dead_letter_policy=gcp.pubsub.SubscriptionDeadLetterPolicyArgs(
186+
dead_letter_topic=self.etl_pubsub_dead_letters_topic.id,
187+
max_delivery_attempts=5,
188+
),
189+
push_config=gcp.pubsub.SubscriptionPushConfigArgs(
190+
push_endpoint=self.etl_load_function.service_config.uri,
191+
oidc_token=gcp.pubsub.SubscriptionPushConfigOidcTokenArgs(
192+
service_account_email=self.etl_service_account.email,
193+
),
194+
attributes={
195+
'x-goog-version': 'v1',
196+
},
197+
),
198+
project=self.config.sample_metadata.gcp.project,
199+
opts=pulumi.ResourceOptions(
200+
depends_on=[
201+
self._svc_pubsub,
202+
self.etl_pubsub_topic,
203+
self.etl_load_function,
204+
self.etl_pubsub_dead_letters_topic,
205+
self.etl_pubsub_dead_letter_subscription
206+
]
207+
),
208+
)
209+
210+
# give subscriber permission to service account
211+
gcp.pubsub.SubscriptionIAMPolicy(
212+
'metamist-etl-pubsub-topic-subscription-policy',
213+
project=self.config.sample_metadata.gcp.project,
214+
subscription=subscription.name,
215+
policy_data=self.prepare_service_account_policy_data(
216+
'roles/pubsub.subscriber'
217+
)
218+
)
219+
220+
return subscription
221+
222+
@cached_property
223+
def etl_pubsub_dead_letter_subscription(self):
224+
"""
225+
Dead letter subscription
226+
"""
227+
return gcp.pubsub.Subscription(
228+
'metamist-etl-dead-letter-subscription',
229+
topic=self.etl_pubsub_dead_letters_topic.name,
230+
project=self.config.sample_metadata.gcp.project,
231+
ack_deadline_seconds=20,
232+
opts=pulumi.ResourceOptions(
233+
depends_on=[
234+
self.etl_pubsub_dead_letters_topic
235+
]
236+
),
237+
)
238+
151239
@cached_property
152240
def etl_bigquery_dataset(self):
153241
"""
@@ -219,7 +307,38 @@ def slack_channel(self):
219307
project=self.config.sample_metadata.gcp.project,
220308
)
221309

310+
def prepare_service_account_policy_data(self, role):
311+
"""
312+
prepare_service_account_policy_data
313+
314+
Args:
315+
role (_type_): _description_
316+
317+
Returns:
318+
_type_: _description_
319+
"""
320+
# get project
321+
project = gcp.organizations.get_project()
322+
323+
return gcp.organizations.get_iam_policy(
324+
bindings=[
325+
gcp.organizations.GetIAMPolicyBindingArgs(
326+
role=role,
327+
members=[
328+
pulumi.Output.concat(
329+
'serviceAccount:service-',
330+
project.number,
331+
'@gcp-sa-pubsub.iam.gserviceaccount.com'
332+
)
333+
],
334+
)
335+
]
336+
).policy_data
337+
222338
def setup_etl(self):
339+
"""
340+
setup_etl
341+
"""
223342
# give the etl_service_account ability to write to bigquery
224343
gcp.bigquery.DatasetAccess(
225344
'metamist-etl-bq-dataset-access',
@@ -229,21 +348,80 @@ def setup_etl(self):
229348
user_by_email=self.etl_service_account.email,
230349
)
231350

351+
# give the etl_service_account ability to execute bigquery jobs
352+
gcp.projects.IAMMember(
353+
'metamist-etl-bq-job-user-role',
354+
project=self.config.sample_metadata.gcp.project,
355+
role='roles/bigquery.jobUser',
356+
member=pulumi.Output.concat(
357+
'serviceAccount:', self.etl_service_account.email
358+
),
359+
)
360+
361+
# pubsub_v1.PublisherClient.publish User not authorized to perform this action
362+
gcp.projects.IAMMember(
363+
'metamist-etl-editor-role',
364+
project=self.config.sample_metadata.gcp.project,
365+
role='roles/editor',
366+
member=pulumi.Output.concat(
367+
'serviceAccount:', self.etl_service_account.email
368+
),
369+
)
370+
371+
self.setup_etl_functions()
372+
self.setup_etl_pubsub()
373+
232374
self.setup_metamist_etl_accessors()
233375
self.setup_slack_notification()
234376

377+
def setup_etl_functions(self):
378+
"""
379+
setup_etl_functions
380+
"""
381+
self.etl_extract_function
382+
self.etl_load_function
383+
384+
def setup_etl_pubsub(self):
385+
"""
386+
setup_etl_pubsub
387+
"""
388+
self.etl_pubsub_dead_letter_subscription
389+
self.etl_pubsub_push_subscription
390+
235391
@cached_property
236-
def etl_function(self):
392+
def etl_extract_function(self):
237393
"""
238-
Driver function to setup the etl infrastructure
394+
etl_extract_function
395+
396+
Returns:
397+
_type_: _description_
239398
"""
399+
return self.etl_function('extract')
400+
401+
@cached_property
402+
def etl_load_function(self):
403+
"""
404+
etl_load_function
405+
406+
Returns:
407+
_type_: _description_
408+
"""
409+
return self.etl_function('load')
410+
411+
def etl_function(self, f_name: str):
412+
"""
413+
Driver function to setup the etl cloud function
414+
"""
415+
416+
path_to_func_folder = ETL_FOLDER / f_name
417+
240418
# The Cloud Function source code itself needs to be zipped up into an
241419
# archive, which we create using the pulumi.AssetArchive primitive.
242-
archive = archive_folder(str(PATH_TO_ETL_ENDPOINT.absolute()))
420+
archive = archive_folder(str(path_to_func_folder.absolute()))
243421

244422
# Create the single Cloud Storage object, which contains the source code
245423
source_archive_object = gcp.storage.BucketObject(
246-
'metamist-etl-endpoint-source-code',
424+
f'metamist-etl-{f_name}-source-code',
247425
# updating the source archive object does not trigger the cloud function
248426
# to actually updating the source because it's based on the name,
249427
# allow Pulumi to create a new name each time it gets updated
@@ -253,11 +431,11 @@ def etl_function(self):
253431
)
254432

255433
fxn = gcp.cloudfunctionsv2.Function(
256-
'metamist-etl-endpoint-source-code',
257-
name='metamist-etl',
434+
f'metamist-etl-{f_name}-source-code',
435+
name=f'metamist-etl-{f_name}',
258436
build_config=gcp.cloudfunctionsv2.FunctionBuildConfigArgs(
259437
runtime='python311',
260-
entry_point='etl_post',
438+
entry_point=f'etl_{f_name}',
261439
environment_variables={},
262440
source=gcp.cloudfunctionsv2.FunctionBuildConfigSourceArgs(
263441
storage_source=gcp.cloudfunctionsv2.FunctionBuildConfigSourceStorageSourceArgs(
@@ -282,7 +460,8 @@ def etl_function(self):
282460
self.etl_bigquery_table.table_id,
283461
),
284462
'PUBSUB_TOPIC': self.etl_pubsub_topic.id,
285-
'ALLOWED_USERS': '[email protected]',
463+
# 'ALLOWED_USERS': '[email protected]',
464+
'ALLOWED_USERS': '[email protected]',
286465
},
287466
ingress_settings='ALLOW_ALL',
288467
all_traffic_on_latest_revision=True,

0 commit comments

Comments
 (0)