diff --git a/etl/load/main.py b/etl/load/main.py index 7d89a4619..e6ac4c1b9 100644 --- a/etl/load/main.py +++ b/etl/load/main.py @@ -1,10 +1,14 @@ import json import logging import os +import base64 +from typing import Dict, Any + import functions_framework import flask import google.cloud.bigquery as bq + BIGQUERY_TABLE = os.getenv('BIGQUERY_TABLE') _BQ_CLIENT = bq.Client() @@ -12,7 +16,25 @@ @functions_framework.http def etl_load(request: flask.Request): - """HTTP Cloud Function. + """HTTP Cloud Function for ETL loading records from BQ to MySQL DB + + This function accepts Pub/Sub push messages or can be called directly + For direct call request_id of BQ record to load is required, e.g. payload: + + { + "request_id": "70eb6292-6311-44cf-9c9b-2b38bb076699" + } + + or Pub/Sub push messages are wrapped, + atm Pulumi does not support unwrapping: + https://github.com/pulumi/pulumi-gcp/issues/1142 + + Example of Pub/Sub push payload, function only parse message.data + + { + 'message': {'data': 'eyJyZXF1ZXN0X2lkIjo ... LTViMWQt21pY3Mub3JnLmF1In0=' + } + Args: request (flask.Request): The request object. @@ -25,36 +47,21 @@ def etl_load(request: flask.Request): Functions, see the `Writing HTTP functions` page. - This function accepts Pub/Sub message, expected request_id in the payload: - - { - "request_id": "70eb6292-6311-44cf-9c9b-2b38bb076699" - } - - At the moment pulumi does not support unwrapping of push messages: - https://github.com/pulumi/pulumi-gcp/issues/1142 - - We need to support both """ auth = request.authorization if not auth or not auth.token: return {'success': False, 'message': 'No auth token provided'}, 401 - logging.info(f'auth {auth}') - # if mimetype might not be set esp. when PubSub pushing from another topic, # try to force conversion and if fails just return None jbody = request.get_json(force=True, silent=True) - logging.info(f'jbody: {jbody}') - if callable(jbody): # request.json is it in reality, but the type checker is saying it's callable jbody = jbody() - request_id = jbody.get('request_id', None) - # check if request_id present + request_id = extract_request_id(jbody) if not request_id: jbody_str = json.dumps(jbody) return { @@ -72,13 +79,56 @@ def etl_load(request: flask.Request): job_config = bq.QueryJobConfig() job_config.query_parameters = query_params - query_job = _BQ_CLIENT.query(query, job_config=job_config).result() + query_job_result = _BQ_CLIENT.query(query, job_config=job_config).result() + + if query_job_result.total_rows == 0: + return { + 'success': False, + 'message': f'Record with id: {request_id} not found', + }, 404 # should be only one record, look into loading multiple objects in one call? - for row in query_job: + row_json = None + for row in query_job_result: # TODO # Parse row.body -> Model and upload to metamist database - row_body = json.loads(row.body) - logging.info(f'row_body {row_body}') + row_json = json.loads(row.body) + + return {'id': request_id, 'record': row_json, 'success': True} + + +def extract_request_id(jbody: Dict[str, Any]) -> str | None: + """Unwrapp request id from the payload - return {'id': request_id, 'success': True} + Args: + jbody (Dict[str, Any]): Json body of payload + + Returns: + str | None: ID of object to be loaded + """ + if not jbody: + return None + + request_id = jbody.get('request_id') + if request_id: + return request_id + + # try if payload is from pub/sub function + message = jbody.get('message') + if not message: + return None + + data = message.get('data') + if not data: + return None + + # data is not empty, decode and extract request_id + request_id = None + try: + data_decoded = base64.b64decode(data) + data_json = json.loads(data_decoded) + request_id = data_json.get('request_id') + except Exception as e: # pylint: disable=broad-exception-caught + logging.error(f'Failed to extract request_id from the payload {e}') + + return request_id diff --git a/metamist_infrastructure/driver.py b/metamist_infrastructure/driver.py index 868ba15ff..2d8060c43 100644 --- a/metamist_infrastructure/driver.py +++ b/metamist_infrastructure/driver.py @@ -358,7 +358,7 @@ def setup_etl(self): ), ) - # pubsub_v1.PublisherClient.publish User not authorized to perform this action + # give the etl_service_account ability to push to pub/sub gcp.projects.IAMMember( 'metamist-etl-editor-role', project=self.config.sample_metadata.gcp.project, @@ -378,15 +378,24 @@ def setup_etl_functions(self): """ setup_etl_functions """ - self.etl_extract_function - self.etl_load_function + # TODO + return pulumi.ResourceOptions( + depends_on=[ + self.etl_extract_function, + self.etl_load_function + ], + ) def setup_etl_pubsub(self): """ setup_etl_pubsub """ - self.etl_pubsub_dead_letter_subscription - self.etl_pubsub_push_subscription + return pulumi.ResourceOptions( + depends_on=[ + self.etl_pubsub_dead_letter_subscription, + self.etl_pubsub_push_subscription + ], + ) @cached_property def etl_extract_function(self): @@ -461,7 +470,7 @@ def etl_function(self, f_name: str): ), 'PUBSUB_TOPIC': self.etl_pubsub_topic.id, # 'ALLOWED_USERS': 'michael.franklin@populationgenomics.org.au', - 'ALLOWED_USERS': 'miloslav.hyben@populationgenomics.org.au', + # 'ALLOWED_USERS': 'miloslav.hyben@populationgenomics.org.au', }, ingress_settings='ALLOW_ALL', all_traffic_on_latest_revision=True, @@ -480,18 +489,18 @@ def setup_metamist_etl_accessors(self): for name, sa in self.etl_accessors.items(): gcp.cloudfunctionsv2.FunctionIamMember( f'metamist-etl-accessor-{name}', - location=self.etl_function.location, - project=self.etl_function.project, - cloud_function=self.etl_function.name, + location=self.etl_extract_function.location, + project=self.etl_extract_function.project, + cloud_function=self.etl_extract_function.name, role='roles/cloudfunctions.invoker', member=pulumi.Output.concat('serviceAccount:', sa.email), ) gcp.cloudrun.IamMember( f'metamist-etl-run-accessor-{name}', - location=self.etl_function.location, - project=self.etl_function.project, - service=self.etl_function.name, # it shared the name + location=self.etl_extract_function.location, + project=self.etl_extract_function.project, + service=self.etl_extract_function.name, # it shared the name role='roles/run.invoker', member=pulumi.Output.concat('serviceAccount:', sa.email), ) @@ -501,7 +510,7 @@ def setup_slack_notification(self): return # Slack notifications - filter_string = self.etl_function.name.apply( + filter_string = self.etl_extract_function.name.apply( lambda fxn_name: f""" resource.type="cloud_function" AND resource.labels.function_name="{fxn_name}"