Skip to content

Commit

Permalink
ETL - load func implemented pub/sub type of payload, cleanup driver c…
Browse files Browse the repository at this point in the history
…lass.
  • Loading branch information
Milo Hyben committed Aug 22, 2023
1 parent 98b403d commit 27fbb12
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 35 deletions.
94 changes: 72 additions & 22 deletions etl/load/main.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,40 @@
import json
import logging
import os
import base64
from typing import Dict, Any

import functions_framework
import flask
import google.cloud.bigquery as bq


BIGQUERY_TABLE = os.getenv('BIGQUERY_TABLE')

_BQ_CLIENT = bq.Client()


@functions_framework.http
def etl_load(request: flask.Request):
"""HTTP Cloud Function.
"""HTTP Cloud Function for ETL loading records from BQ to MySQL DB
This function accepts Pub/Sub push messages or can be called directly
For direct call request_id of BQ record to load is required, e.g. payload:
{
"request_id": "70eb6292-6311-44cf-9c9b-2b38bb076699"
}
or Pub/Sub push messages are wrapped,
atm Pulumi does not support unwrapping:
https://github.com/pulumi/pulumi-gcp/issues/1142
Example of Pub/Sub push payload, function only parse message.data
{
'message': {'data': 'eyJyZXF1ZXN0X2lkIjo ... LTViMWQt21pY3Mub3JnLmF1In0='
}
Args:
request (flask.Request): The request object.
<https://flask.palletsprojects.com/en/1.1.x/api/#incoming-request-data>
Expand All @@ -25,36 +47,21 @@ def etl_load(request: flask.Request):
Functions, see the `Writing HTTP functions` page.
<https://cloud.google.com/functions/docs/writing/http#http_frameworks>
This function accepts Pub/Sub message, expected request_id in the payload:
{
"request_id": "70eb6292-6311-44cf-9c9b-2b38bb076699"
}
At the moment pulumi does not support unwrapping of push messages:
https://github.com/pulumi/pulumi-gcp/issues/1142
We need to support both
"""

auth = request.authorization
if not auth or not auth.token:
return {'success': False, 'message': 'No auth token provided'}, 401

logging.info(f'auth {auth}')

# if mimetype might not be set esp. when PubSub pushing from another topic,
# try to force conversion and if fails just return None
jbody = request.get_json(force=True, silent=True)

logging.info(f'jbody: {jbody}')

if callable(jbody):
# request.json is it in reality, but the type checker is saying it's callable
jbody = jbody()

request_id = jbody.get('request_id', None)
# check if request_id present
request_id = extract_request_id(jbody)
if not request_id:
jbody_str = json.dumps(jbody)
return {
Expand All @@ -72,13 +79,56 @@ def etl_load(request: flask.Request):

job_config = bq.QueryJobConfig()
job_config.query_parameters = query_params
query_job = _BQ_CLIENT.query(query, job_config=job_config).result()
query_job_result = _BQ_CLIENT.query(query, job_config=job_config).result()

if query_job_result.total_rows == 0:
return {
'success': False,
'message': f'Record with id: {request_id} not found',
}, 404

# should be only one record, look into loading multiple objects in one call?
for row in query_job:
row_json = None
for row in query_job_result:
# TODO
# Parse row.body -> Model and upload to metamist database
row_body = json.loads(row.body)
logging.info(f'row_body {row_body}')
row_json = json.loads(row.body)

return {'id': request_id, 'record': row_json, 'success': True}


def extract_request_id(jbody: Dict[str, Any]) -> str | None:
"""Unwrapp request id from the payload
return {'id': request_id, 'success': True}
Args:
jbody (Dict[str, Any]): Json body of payload
Returns:
str | None: ID of object to be loaded
"""
if not jbody:
return None

request_id = jbody.get('request_id')
if request_id:
return request_id

# try if payload is from pub/sub function
message = jbody.get('message')
if not message:
return None

data = message.get('data')
if not data:
return None

# data is not empty, decode and extract request_id
request_id = None
try:
data_decoded = base64.b64decode(data)
data_json = json.loads(data_decoded)
request_id = data_json.get('request_id')
except Exception as e: # pylint: disable=broad-exception-caught
logging.error(f'Failed to extract request_id from the payload {e}')

return request_id
35 changes: 22 additions & 13 deletions metamist_infrastructure/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def setup_etl(self):
),
)

# pubsub_v1.PublisherClient.publish User not authorized to perform this action
# give the etl_service_account ability to push to pub/sub
gcp.projects.IAMMember(
'metamist-etl-editor-role',
project=self.config.sample_metadata.gcp.project,
Expand All @@ -378,15 +378,24 @@ def setup_etl_functions(self):
"""
setup_etl_functions
"""
self.etl_extract_function
self.etl_load_function
# TODO
return pulumi.ResourceOptions(
depends_on=[
self.etl_extract_function,
self.etl_load_function
],
)

def setup_etl_pubsub(self):
"""
setup_etl_pubsub
"""
self.etl_pubsub_dead_letter_subscription
self.etl_pubsub_push_subscription
return pulumi.ResourceOptions(
depends_on=[
self.etl_pubsub_dead_letter_subscription,
self.etl_pubsub_push_subscription
],
)

@cached_property
def etl_extract_function(self):
Expand Down Expand Up @@ -461,7 +470,7 @@ def etl_function(self, f_name: str):
),
'PUBSUB_TOPIC': self.etl_pubsub_topic.id,
# 'ALLOWED_USERS': '[email protected]',
'ALLOWED_USERS': '[email protected]',
# 'ALLOWED_USERS': '[email protected]',
},
ingress_settings='ALLOW_ALL',
all_traffic_on_latest_revision=True,
Expand All @@ -480,18 +489,18 @@ def setup_metamist_etl_accessors(self):
for name, sa in self.etl_accessors.items():
gcp.cloudfunctionsv2.FunctionIamMember(
f'metamist-etl-accessor-{name}',
location=self.etl_function.location,
project=self.etl_function.project,
cloud_function=self.etl_function.name,
location=self.etl_extract_function.location,
project=self.etl_extract_function.project,
cloud_function=self.etl_extract_function.name,
role='roles/cloudfunctions.invoker',
member=pulumi.Output.concat('serviceAccount:', sa.email),
)

gcp.cloudrun.IamMember(
f'metamist-etl-run-accessor-{name}',
location=self.etl_function.location,
project=self.etl_function.project,
service=self.etl_function.name, # it shared the name
location=self.etl_extract_function.location,
project=self.etl_extract_function.project,
service=self.etl_extract_function.name, # it shared the name
role='roles/run.invoker',
member=pulumi.Output.concat('serviceAccount:', sa.email),
)
Expand All @@ -501,7 +510,7 @@ def setup_slack_notification(self):
return

# Slack notifications
filter_string = self.etl_function.name.apply(
filter_string = self.etl_extract_function.name.apply(
lambda fxn_name: f"""
resource.type="cloud_function"
AND resource.labels.function_name="{fxn_name}"
Expand Down

0 comments on commit 27fbb12

Please sign in to comment.