From ebbba83003b4f75f99f8b168f57a6e4f0738bf72 Mon Sep 17 00:00:00 2001 From: Milo Hyben Date: Wed, 13 Sep 2023 16:58:46 +1000 Subject: [PATCH] Fixed linting issues. --- etl/load/main.py | 143 ++++++++++-------- etl/notification/main.py | 6 +- etl/notification/requirements.txt | 2 +- etl/test/test_etl_load.py | 6 +- metamist_infrastructure/driver.py | 27 ++-- metamist_infrastructure/slack_notification.py | 97 ++++++------ 6 files changed, 152 insertions(+), 129 deletions(-) diff --git a/etl/load/main.py b/etl/load/main.py index 39a939043..4bf1400a2 100644 --- a/etl/load/main.py +++ b/etl/load/main.py @@ -10,9 +10,10 @@ import functions_framework import google.cloud.bigquery as bq +from google.cloud import pubsub_v1 + # import all public parsers import metamist.parser as mp -from google.cloud import pubsub_v1 # try to import private parsers # try: @@ -27,14 +28,8 @@ def call_parser(parser_obj, row_json): - """_summary_ - - Args: - parser_obj (_type_): _description_ - row_json (_type_): _description_ - - Returns: - _type_: _description_ + """ + This function calls parser_obj.from_json and returns status and result """ tmp_res = [] tmp_status = [] @@ -48,7 +43,7 @@ async def run_parser_capture_result(parser_obj, row_data, res, status): res.append(r) status.append('SUCCESS') except Exception as e: # pylint: disable=broad-exception-caught - logging.error(f'Failed to parse the row {e}') + logging.error(f'Failed to parse: {e}') # add to the output res.append(e) status.append('FAILED') @@ -57,6 +52,70 @@ async def run_parser_capture_result(parser_obj, row_data, res, status): return tmp_status[0], tmp_res[0] +def process_rows(query_job_result, delivery_attempt, request_id, parser_map, bq_client): + """ + Process BQ results rows, should be only one row + """ + status = None + parsing_result = None + row_json = None + + for row in query_job_result: + sample_type = row.type + # sample_type should be in the format /ParserName/Version e.g.: /bbv/v1 + + row_json = json.loads(row.body) + # get config from payload or use default + config_data = row_json.get('config') + # get data from payload or use payload as data + record_data = row_json.get('data', row_json) + + parser_obj = get_parser_instance(parser_map, sample_type, config_data) + if parser_obj: + # Parse row.body -> Model and upload to metamist database + status, parsing_result = call_parser(parser_obj, record_data) + else: + status = 'FAILED' + parsing_result = f'Missing or invalid sample_type: {sample_type} in the record with id: {request_id}' + + if delivery_attempt == 1: + # log only at the first attempt + log_details = { + 'type': sample_type, + 'submitting_user': row.submitting_user, + 'result': f'{parsing_result}', + } + + # log results to BIGQUERY_LOG_TABLE + log_record = { + 'request_id': request_id, + 'timestamp': datetime.datetime.utcnow().isoformat(), + 'status': status, + 'details': json.dumps(log_details), + } + + errors = bq_client.insert_rows_json( + BIGQUERY_LOG_TABLE, + [log_record], + ) + if errors: + logging.error(f'Failed to log to BQ: {errors}') + + if status == 'FAILED': + # publish to notification pubsub + msg_title = 'Metamist ETL Load Failed' + try: + pubsub_client = pubsub_v1.PublisherClient() + pubsub_client.publish( + NOTIFICATION_PUBSUB_TOPIC, + json.dumps({'title': msg_title} | log_record).encode(), + ) + except Exception as e: # pylint: disable=broad-exception-caught + logging.error(f'Failed to publish to pubsub: {e}') + + return status, parsing_result, row_json + + @functions_framework.http def etl_load(request: flask.Request): """HTTP Cloud Function for ETL loading records from BQ to MySQL DB @@ -138,72 +197,24 @@ def etl_load(request: flask.Request): }, 412 # Precondition Failed # should be only one record, look into loading multiple objects in one call? - row_json = None - result = None - status = None - log_record = None - for row in query_job_result: - sample_type = row.type - # sample_type should be in the format /ParserName/Version e.g.: /bbv/v1 - - row_json = json.loads(row.body) - # get config from payload or use default - config_data = row_json.get('config') - # get data from payload or use payload as data - record_data = row_json.get('data', row_json) - - parser_obj = get_parser_instance(parser_map, sample_type, config_data) - if parser_obj: - # Parse row.body -> Model and upload to metamist database - status, result = call_parser(parser_obj, record_data) - else: - status = 'FAILED' - result = f'Missing or invalid sample_type: {sample_type} in the record with id: {request_id}' - - if delivery_attempt == 1: - # log results to BIGQUERY_LOG_TABLE - log_record = { - 'request_id': request_id, - 'timestamp': datetime.datetime.utcnow().isoformat(), - 'status': status, - 'details': { - 'type': sample_type, - 'submitting_user': row.submitting_user, - 'result': f"'{result}'", # convert to string - } - # json.dumps({'result': f"'{result}'"}), - } - bq_client.insert_rows_json( - BIGQUERY_LOG_TABLE, - [log_record], - ) - - if status == 'FAILED': - # publish to notification pubsub - msg_title = 'Metamist ETL Load Failed' - try: - pubsub_client = pubsub_v1.PublisherClient() - pubsub_client.publish( - NOTIFICATION_PUBSUB_TOPIC, - json.dumps({'title': msg_title} | log_record).encode(), - ) - except Exception as e: # pylint: disable=broad-exception-caught - logging.error(f'Failed to publish to pubsub: {e}') + (status, parsing_result, last_record) = process_rows( + query_job_result, delivery_attempt, request_id, parser_map, bq_client + ) # return success if status and status == 'SUCCESS': return { 'id': request_id, - 'record': row_json, - 'result': f"'{result}'", + 'record': last_record, + 'result': f'{parsing_result}', 'success': True, } - # return error + # otherwise return error return { 'id': request_id, - 'record': row_json, - 'result': f"'{result}'", + 'record': last_record, + 'result': f'{parsing_result}', 'success': False, }, 500 diff --git a/etl/notification/main.py b/etl/notification/main.py index 0c90970c3..c1e2e00b6 100644 --- a/etl/notification/main.py +++ b/etl/notification/main.py @@ -52,14 +52,12 @@ def etl_notify(request: flask.Request): }, 400 # TODO: format message to slack message - print(type(message), message) message_blocks = format_slack(message) - print("message_blocks", message_blocks) success = None try: client = WebClient(token=SLACK_BOT_TOKEN) response = client.chat_postMessage(channel=SLACK_CHANNEL, blocks=message_blocks) - success = response.get('ok') == True + success = response.get('ok') is True except SlackApiError as e: return { 'success': False, @@ -93,7 +91,7 @@ def format_slack(message: Dict[str, Any]) -> Any | None: if key in ['title', 'status']: continue message_sections.append( - {"type": "section", "text": {"type": "mrkdwn", "text": f"*{key}*: {value}"}} + {'type': 'section', 'text': {'type': 'mrkdwn', 'text': f'*{key}*: {value}'}} ) return message_sections diff --git a/etl/notification/requirements.txt b/etl/notification/requirements.txt index 617edf595..be276557b 100644 --- a/etl/notification/requirements.txt +++ b/etl/notification/requirements.txt @@ -3,4 +3,4 @@ functions_framework google-cloud-bigquery google-cloud-logging google-cloud-pubsub -slack_sdk \ No newline at end of file +slack_sdk diff --git a/etl/test/test_etl_load.py b/etl/test/test_etl_load.py index 218b2bfc4..6c6c129c0 100644 --- a/etl/test/test_etl_load.py +++ b/etl/test/test_etl_load.py @@ -138,9 +138,7 @@ async def test_etl_load_found_record_simple_payload(self, call_parser, bq_client { 'id': '1234567890', 'record': json.loads(ETL_SAMPLE_RECORD_2), - 'result': '' - "'Missing or invalid sample_type: /bbv/v1 in the record with id: 1234567890'" - '', + 'result': 'Missing or invalid sample_type: /bbv/v1 in the record with id: 1234567890', 'success': False, }, ) @@ -200,7 +198,7 @@ async def test_etl_load_found_record_pubsub_payload(self, call_parser, bq_client { 'id': '6dc4b9ae-74ee-42ee-9298-b0a51d5c6836', 'record': json.loads(ETL_SAMPLE_RECORD_3), - 'result': "''", + 'result': '', 'success': True, }, ) diff --git a/metamist_infrastructure/driver.py b/metamist_infrastructure/driver.py index 5ed322dca..955620085 100644 --- a/metamist_infrastructure/driver.py +++ b/metamist_infrastructure/driver.py @@ -13,7 +13,11 @@ from cpg_infra.plugin import CpgInfrastructurePlugin # from cpg_infra.utils import archive_folder -from slack_notification import SlackNotification, SlackNotificationType +from slack_notification import ( + SlackNotification, + SlackNotificationType, + SlackNotificationConfig, +) # this gets moved around during the pip install ETL_FOLDER = Path(__file__).parent / 'etl' @@ -57,7 +61,7 @@ def main(self): """Driver for the metamist infrastructure as code plugin""" # todo, eventually configure metamist cloud run server # to be deployed here, but for now it's manually deployed - self.setup_etl() + self._setup_etl() @cached_property def _svc_cloudresourcemanager(self): @@ -151,7 +155,7 @@ def etl_service_account(self): ), ) - def etl_function_account(self, f_name: str): + def _etl_function_account(self, f_name: str): """ Service account for cloud function """ @@ -167,12 +171,12 @@ def etl_function_account(self, f_name: str): @cached_property def etl_load_service_account(self): """Service account to run load/transform functionality""" - return self.etl_function_account('load') + return self._etl_function_account('load') @cached_property def etl_extract_service_account(self): """Service account to run extract functionality""" - return self.etl_function_account('extract') + return self._etl_function_account('extract') @cached_property def etl_accessors(self): @@ -368,7 +372,7 @@ def prepare_service_account_policy_data(self, role): ] ).policy_data - def setup_etl(self): + def _setup_etl(self): """ setup_etl """ @@ -577,18 +581,19 @@ def etl_slack_notification(self): Setup Slack notification """ - project = gcp.organizations.get_project() - - notification = SlackNotification( + slack_config = SlackNotificationConfig( project_name=self.config.sample_metadata.gcp.project, - project_number=project.number, location=self.config.gcp.region, service_account=self.etl_service_account, # can be some other account source_bucket=self.source_bucket, - topic_name='metamist-etl-notification', slack_secret_project_id=self.config.billing.gcp.project_id, slack_token_secret_name=self.config.billing.aggregator.slack_token_secret_name, slack_channel_name=self.config.sample_metadata.slack_channel, + ) + + notification = SlackNotification( + slack_config=slack_config, + topic_name='metamist-etl-notification', func_to_monitor=[ 'metamist-etl-notification-func', 'metamist-etl-extract', diff --git a/metamist_infrastructure/slack_notification.py b/metamist_infrastructure/slack_notification.py index 5e6322a67..56cd646cb 100644 --- a/metamist_infrastructure/slack_notification.py +++ b/metamist_infrastructure/slack_notification.py @@ -49,38 +49,45 @@ class SlackNotificationType(Enum): BOTH = 3 -class SlackNotification: - """ - Metamist Infrastructure Notification Slack Plugin - """ +class SlackNotificationConfig: + """Slack token, channel and project id wrapped in the config class""" def __init__( self, project_name: str, - project_number: str, location: str, # e.g. self.config.gcp.region service_account: object, source_bucket: object, - topic_name: str, # e.g. 'metamist-etl-notification' slack_secret_project_id: str, slack_token_secret_name: str, slack_channel_name: str, - func_to_monitor: list | None, - notification_type: SlackNotificationType, - depends_on: list | None, ): - """Slack notification constructor""" + """Slack notification config constructor""" self.project_name = project_name - self.project_number = project_number self.location = location self.service_account = service_account self.source_bucket = source_bucket - self.topic_name = topic_name - self.slack_secret_project_id = slack_secret_project_id self.slack_token_secret_name = slack_token_secret_name self.slack_channel_name = slack_channel_name + +class SlackNotification: + """ + Metamist Infrastructure Notification Slack Plugin + """ + + def __init__( + self, + slack_config: SlackNotificationConfig, + topic_name: str, # e.g. 'metamist-etl-notification' + func_to_monitor: list | None, + notification_type: SlackNotificationType, + depends_on: list | None, + ): + """Slack notification constructor""" + self.config = slack_config + self.topic_name = topic_name self.func_to_monitor = func_to_monitor self.notification_type = notification_type self.depends_on = depends_on @@ -89,9 +96,11 @@ def _setup_notification_permissions(self): """Setup permissions for the service account""" gcp.projects.IAMMember( f'{self.topic_name}-sa-run-invoker-role', - project=self.project_name, + project=self.config.project_name, role='roles/run.invoker', - member=pulumi.Output.concat('serviceAccount:', self.service_account.email), + member=pulumi.Output.concat( + 'serviceAccount:', self.config.service_account.email + ), ) def _incident_setup_alerts_slack_notification(self): @@ -155,7 +164,7 @@ def notification_cloudfun(self): # function to actually updating the source because # it's based on the name, # allow Pulumi to create a new name each time it gets updated - bucket=self.source_bucket.name, + bucket=self.config.source_bucket.name, source=archive, opts=pulumi.ResourceOptions(replace_on_changes=['*']), ) @@ -169,7 +178,7 @@ def notification_cloudfun(self): environment_variables={}, source=gcp.cloudfunctionsv2.FunctionBuildConfigSourceArgs( storage_source=gcp.cloudfunctionsv2.FunctionBuildConfigSourceStorageSourceArgs( - bucket=self.source_bucket.name, + bucket=self.config.source_bucket.name, object=source_archive_object.name, ), ), @@ -183,18 +192,18 @@ def notification_cloudfun(self): environment_variables={ 'SLACK_BOT_TOKEN': read_secret( # reuse this secret :) - project_id=self.slack_secret_project_id, - secret_name=self.slack_token_secret_name, + project_id=self.config.slack_secret_project_id, + secret_name=self.config.slack_token_secret_name, fail_gracefully=False, ), - 'SLACK_CHANNEL': self.slack_channel_name, + 'SLACK_CHANNEL': self.config.slack_channel_name, }, ingress_settings='ALLOW_ALL', all_traffic_on_latest_revision=True, - service_account_email=self.service_account.email, + service_account_email=self.config.service_account.email, ), - project=self.project_name, - location=self.location, + project=self.config.project_name, + location=self.config.location, opts=pulumi.ResourceOptions(depends_on=self.depends_on), ) return fxn @@ -210,17 +219,17 @@ def incident_alerts_channel(self): f'{self.topic_name}-incidents-channel', display_name=f'{self.topic_name} incidents slack notification channel', type='slack', - labels={'channel_name': self.slack_channel_name}, + labels={'channel_name': self.config.slack_channel_name}, sensitive_labels=gcp.monitoring.NotificationChannelSensitiveLabelsArgs( auth_token=read_secret( # reuse this secret :) - project_id=self.slack_secret_project_id, - secret_name=self.slack_token_secret_name, + project_id=self.config.slack_secret_project_id, + secret_name=self.config.slack_token_secret_name, fail_gracefully=False, ), ), description=f'Slack notification channel for {self.topic_name}', - project=self.project_name, + project=self.config.project_name, ) @cached_property @@ -228,7 +237,7 @@ def notification_pubsub_topic(self): """ Pubsub topic to trigger send notification message to slack """ - return gcp.pubsub.Topic(self.topic_name, project=self.project_name) + return gcp.pubsub.Topic(self.topic_name, project=self.config.project_name) @cached_property def notification_pubsub_push_subscription(self): @@ -247,13 +256,13 @@ def notification_pubsub_push_subscription(self): push_config=gcp.pubsub.SubscriptionPushConfigArgs( push_endpoint=self.notification_cloudfun.service_config.uri, oidc_token=gcp.pubsub.SubscriptionPushConfigOidcTokenArgs( - service_account_email=self.service_account.email, + service_account_email=self.config.service_account.email, ), attributes={ 'x-goog-version': 'v1', }, ), - project=self.project_name, + project=self.config.project_name, opts=pulumi.ResourceOptions( depends_on=[ self.notification_pubsub_topic, @@ -264,10 +273,11 @@ def notification_pubsub_push_subscription(self): ) # give subscriber permission to service account + project = gcp.organizations.get_project() members = [ pulumi.Output.concat( 'serviceAccount:service-', - self.project_number, + project.number, '@gcp-sa-pubsub.iam.gserviceaccount.com', ) ] @@ -275,7 +285,7 @@ def notification_pubsub_push_subscription(self): # give publisher permission to service account gcp.pubsub.SubscriptionIAMPolicy( f'{self.topic_name}-subscription-iam-policy', - project=self.project_name, + project=self.config.project_name, subscription=subscription.name, policy_data=prepare_policy_data('roles/pubsub.subscriber', members), ) @@ -288,13 +298,14 @@ def notification_dead_letters_pubsub_topic(self): Dead letters pubsub topic to capture failed jobs """ topic = gcp.pubsub.Topic( - f'{self.topic_name}-dead-letters-topic', project=self.project_name + f'{self.topic_name}-dead-letters-topic', project=self.config.project_name ) + project = gcp.organizations.get_project() members = [ pulumi.Output.concat( 'serviceAccount:service-', - self.project_number, + project.number, '@gcp-sa-pubsub.iam.gserviceaccount.com', ) ] @@ -302,7 +313,7 @@ def notification_dead_letters_pubsub_topic(self): # give publisher permission to service account gcp.pubsub.TopicIAMPolicy( f'{self.topic_name}-dead-letters-topic-iam-policy', - project=self.project_name, + project=self.config.project_name, topic=topic.name, policy_data=prepare_policy_data('roles/pubsub.publisher', members), ) @@ -316,8 +327,8 @@ def notification_dead_letters_pubsub_subscription(self): """ return gcp.pubsub.Subscription( f'{self.topic_name}-dead-letters-subscription', - topic=self.dead_letters_pubsub_topic.name, - project=self.project_name, + topic=self.notification_dead_letters_pubsub_topic.name, + project=self.config.project_name, ack_deadline_seconds=20, opts=pulumi.ResourceOptions( depends_on=[self.notification_dead_letters_pubsub_topic] @@ -344,15 +355,15 @@ def main(self): """Main function to setup notification infrastructure""" alerts_channel = None pubsub_topic = None - if ( - self.notification_type == SlackNotificationType.INCIDENT_ALERT - or self.notification_type == SlackNotificationType.BOTH + if self.notification_type in ( + SlackNotificationType.INCIDENT_ALERT, + self.notification_type == SlackNotificationType.BOTH, ): alerts_channel = self.setup_incident_alerts_channel() - if ( - self.notification_type == SlackNotificationType.NOTIFICATION - or self.notification_type == SlackNotificationType.BOTH + if self.notification_type in ( + SlackNotificationType.NOTIFICATION, + self.notification_type == SlackNotificationType.BOTH, ): self.setup_notification() pubsub_topic = self.notification_pubsub_topic