diff --git a/etl/load/main.py b/etl/load/main.py index 3bdfaf8b1..39a939043 100644 --- a/etl/load/main.py +++ b/etl/load/main.py @@ -9,29 +9,21 @@ import flask import functions_framework import google.cloud.bigquery as bq + +# import all public parsers import metamist.parser as mp +from google.cloud import pubsub_v1 + +# try to import private parsers +# try: +# import metamist_private.parser as mpp +# except ImportError: +# pass + BIGQUERY_TABLE = os.getenv('BIGQUERY_TABLE') BIGQUERY_LOG_TABLE = os.getenv('BIGQUERY_LOG_TABLE') - -DEFAULT_PARSER_PARAMS = { - 'search_locations': [], - 'project': 'milo-dev', - 'participant_column': 'individual_id', - 'sample_name_column': 'sample_id', - 'seq_type_column': 'sequencing_type', - 'default_sequencing_type': 'sequencing_type', - 'default_sample_type': 'blood', - 'default_sequencing_technology': 'short-read', - 'sample_meta_map': { - 'collection_centre': 'centre', - 'collection_date': 'collection_date', - 'collection_specimen': 'specimen', - }, - 'participant_meta_map': {}, - 'assay_meta_map': {}, - 'qc_meta_map': {}, -} +NOTIFICATION_PUBSUB_TOPIC = os.getenv('NOTIFICATION_PUBSUB_TOPIC') def call_parser(parser_obj, row_json): @@ -108,7 +100,9 @@ def etl_load(request: flask.Request): # try to force conversion and if fails just return None jbody = request.get_json(force=True, silent=True) - request_id = extract_request_id(jbody) + # use delivery_attempt to only log error once, min number of attempts for pub/sub is 5 + # so we want to avoid 5 records / slack messages to be passed around per one error + delivery_attempt, request_id = extract_request_id(jbody) if not request_id: jbody_str = json.dumps(jbody) return { @@ -147,39 +141,56 @@ def etl_load(request: flask.Request): row_json = None result = None status = None + log_record = None for row in query_job_result: sample_type = row.type # sample_type should be in the format /ParserName/Version e.g.: /bbv/v1 row_json = json.loads(row.body) # get config from payload or use default - config_data = row_json.get('config', DEFAULT_PARSER_PARAMS) + config_data = row_json.get('config') # get data from payload or use payload as data record_data = row_json.get('data', row_json) parser_obj = get_parser_instance(parser_map, sample_type, config_data) - if not parser_obj: - return { - 'success': False, - 'message': f'Missing or invalid sample_type: {sample_type} in the record with id: {request_id}', - }, 412 # Precondition Failed - - # Parse row.body -> Model and upload to metamist database - status, result = call_parser(parser_obj, record_data) - - # log results to BIGQUERY_LOG_TABLE - bq_client.insert_rows_json( - BIGQUERY_LOG_TABLE, - [ - { - 'request_id': request_id, - 'timestamp': datetime.datetime.utcnow().isoformat(), - 'status': status, - 'details': json.dumps({'result': f"'{result}'"}), + if parser_obj: + # Parse row.body -> Model and upload to metamist database + status, result = call_parser(parser_obj, record_data) + else: + status = 'FAILED' + result = f'Missing or invalid sample_type: {sample_type} in the record with id: {request_id}' + + if delivery_attempt == 1: + # log results to BIGQUERY_LOG_TABLE + log_record = { + 'request_id': request_id, + 'timestamp': datetime.datetime.utcnow().isoformat(), + 'status': status, + 'details': { + 'type': sample_type, + 'submitting_user': row.submitting_user, + 'result': f"'{result}'", # convert to string } - ], - ) - + # json.dumps({'result': f"'{result}'"}), + } + bq_client.insert_rows_json( + BIGQUERY_LOG_TABLE, + [log_record], + ) + + if status == 'FAILED': + # publish to notification pubsub + msg_title = 'Metamist ETL Load Failed' + try: + pubsub_client = pubsub_v1.PublisherClient() + pubsub_client.publish( + NOTIFICATION_PUBSUB_TOPIC, + json.dumps({'title': msg_title} | log_record).encode(), + ) + except Exception as e: # pylint: disable=broad-exception-caught + logging.error(f'Failed to publish to pubsub: {e}') + + # return success if status and status == 'SUCCESS': return { 'id': request_id, @@ -188,7 +199,7 @@ def etl_load(request: flask.Request): 'success': True, } - # some error happened + # return error return { 'id': request_id, 'record': row_json, @@ -207,20 +218,22 @@ def extract_request_id(jbody: Dict[str, Any]) -> str | None: str | None: ID of object to be loaded """ if not jbody: - return None + return None, None request_id = jbody.get('request_id') + # set default delivery_attempt as 1 + delivery_attempt = jbody.get('deliveryAttempt', 1) if request_id: - return request_id + return delivery_attempt, request_id # try if payload is from pub/sub function message = jbody.get('message') if not message: - return None + return delivery_attempt, None data = message.get('data') if not data: - return None + return delivery_attempt, None # data is not empty, decode and extract request_id request_id = None @@ -231,7 +244,7 @@ def extract_request_id(jbody: Dict[str, Any]) -> str | None: except Exception as e: # pylint: disable=broad-exception-caught logging.error(f'Failed to extract request_id from the payload {e}') - return request_id + return delivery_attempt, request_id def get_parser_instance( diff --git a/etl/load/requirements.txt b/etl/load/requirements.txt index 0c953fc64..3e4584ed9 100644 --- a/etl/load/requirements.txt +++ b/etl/load/requirements.txt @@ -2,5 +2,6 @@ flask functions_framework google-cloud-bigquery google-cloud-logging +google-cloud-pubsub # will be replaced with metamist once it cotains the parser changes ./metamist-6.2.0.tar.gz diff --git a/etl/notification/__init__.py b/etl/notification/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/etl/notification/main.py b/etl/notification/main.py new file mode 100644 index 000000000..0c90970c3 --- /dev/null +++ b/etl/notification/main.py @@ -0,0 +1,131 @@ +import base64 +import json +import logging +import os +from typing import Any, Dict + +import flask +import functions_framework +from slack_sdk import WebClient +from slack_sdk.errors import SlackApiError + +SLACK_BOT_TOKEN = os.getenv('SLACK_BOT_TOKEN') +SLACK_CHANNEL = os.getenv('SLACK_CHANNEL') + + +@functions_framework.http +def etl_notify(request: flask.Request): + """HTTP Cloud Function for Sending notification to slack channel + + This cloud function is setup as subscriber to notification pub/sub topic + Payload message is expected to be in json format and it is passed to slack channel as is + TODO: We will add more formatting as we go along + + Args: + request (flask.Request): The request object. + + Returns: + The response text, or any set of values that can be turned into a + Response object using `make_response` + . + Note: + For more information on how Flask integrates with Cloud + Functions, see the `Writing HTTP functions` page. + + + """ + + auth = request.authorization + if not auth or not auth.token: + return {'success': False, 'message': 'No auth token provided'}, 401 + + # if mimetype might not be set esp. when PubSub pushing from another topic, + # try to force conversion and if fails just return None + jbody = request.get_json(force=True, silent=True) + + message = decode_message(jbody) + if not message: + jbody_str = json.dumps(jbody) + return { + 'success': False, + 'message': f'Missing or empty message: {jbody_str}', + }, 400 + + # TODO: format message to slack message + print(type(message), message) + message_blocks = format_slack(message) + print("message_blocks", message_blocks) + success = None + try: + client = WebClient(token=SLACK_BOT_TOKEN) + response = client.chat_postMessage(channel=SLACK_CHANNEL, blocks=message_blocks) + success = response.get('ok') == True + except SlackApiError as e: + return { + 'success': False, + 'message': f'Got an error: {e.response["error"]}', + }, 500 + + if success: + return {'success': True, 'message': 'Message sent'} + + return {'success': False, 'message': 'Failed to send message'}, 500 + + +def format_slack(message: Dict[str, Any]) -> Any | None: + """ + Basic Slack message formatting + Message is json file, for time being we pass all the keys to the message + If title present it will be used as title of the message + """ + message_sections = [] + + if 'title' in message: + message_sections.append( + { + 'type': 'header', + 'text': {'type': 'plain_text', 'text': f'{message.get("title")}'}, + } + ) + message_sections.append({'type': 'divider'}) + + for key, value in message.items(): + if key in ['title', 'status']: + continue + message_sections.append( + {"type": "section", "text": {"type": "mrkdwn", "text": f"*{key}*: {value}"}} + ) + + return message_sections + + +def decode_message(jbody: Dict[str, Any]) -> Any | None: + """Decode the message from payload + + Args: + jbody (Dict[str, Any]): Json body of payload + + Returns: + str | None: Decoded message + """ + if not jbody: + return None + + # try if payload is from pub/sub function + message = jbody.get('message') + if not message: + return None + + data = message.get('data') + if not data: + return None + + # data is not empty, decode and extract request_id + data_json = None + try: + data_decoded = base64.b64decode(data) + data_json = json.loads(data_decoded) + except Exception as e: # pylint: disable=broad-exception-caught + logging.error(f'Failed to extract request_id from the payload {e}') + + return data_json diff --git a/etl/notification/requirements.txt b/etl/notification/requirements.txt new file mode 100644 index 000000000..617edf595 --- /dev/null +++ b/etl/notification/requirements.txt @@ -0,0 +1,6 @@ +flask +functions_framework +google-cloud-bigquery +google-cloud-logging +google-cloud-pubsub +slack_sdk \ No newline at end of file diff --git a/etl/test/test_etl_load.py b/etl/test/test_etl_load.py index 0b54617ca..218b2bfc4 100644 --- a/etl/test/test_etl_load.py +++ b/etl/test/test_etl_load.py @@ -7,11 +7,61 @@ import metamist.parser as mp ETL_SAMPLE_RECORD_1 = """ -{"identifier": "AB0002", "name": "j smith", "age": 50, "measurement": "98.7", "observation": "B++", "receipt_date": "1/02/2023"} +{ + "identifier": "AB0002", + "name": "j smith", + "age": 50, + "measurement": "98.7", + "observation": "B++", + "receipt_date": "1/02/2023" +} """ ETL_SAMPLE_RECORD_2 = """ -{"sample_id": "123456", "external_id": "GRK100311", "individual_id": "608", "sequencing_type": "exome", "collection_centre": "KCCG", "collection_date": "2023-08-05T01:39:28.611476", "collection_specimen": "blood"} +{ + "sample_id": "123456", + "external_id": "GRK100311", + "individual_id": "608", + "sequencing_type": "exome", + "collection_centre": "KCCG", + "collection_date": "2023-08-05T01:39:28.611476", + "collection_specimen": "blood" +} +""" + +ETL_SAMPLE_RECORD_3 = """ +{ + "config": + { + "search_locations": [], + "project": "milo-dev", + "participant_column": "individual_id", + "sample_name_column": "sample_id", + "seq_type_column": "sequencing_type", + "default_sequencing_type": "sequencing_type", + "default_sample_type": "blood", + "default_sequencing_technology": "short-read", + "sample_meta_map": + { + "collection_centre": "centre", + "collection_date": "collection_date", + "collection_specimen": "specimen" + }, + "participant_meta_map": {}, + "assay_meta_map": {}, + "qc_meta_map": {} + }, + "data": + { + "sample_id": "123456", + "external_id": "GRK100311", + "individual_id": "608", + "sequencing_type": "exome", + "collection_centre": "KCCG", + "collection_date": "2023-08-05T01:39:28.611476", + "collection_specimen": "blood" + } +} """ @@ -62,9 +112,10 @@ async def test_etl_load_found_record_simple_payload(self, call_parser, bq_client ) request.get_json.return_value = json.loads('{"request_id": "1234567890"}') - query_row = MagicMock(args={}, spec=['body', 'type']) + query_row = MagicMock(args={}, spec=['body', 'type', 'submitting_user']) query_row.body = ETL_SAMPLE_RECORD_2 - query_row.type = '/gmp/v1' + query_row.type = '/bbv/v1' + query_row.submitting_user = 'user@mail.com' query_job_result = MagicMock(args={}, spec=['__iter__', '__next__']) query_job_result.total_rows = 1 @@ -78,14 +129,19 @@ async def test_etl_load_found_record_simple_payload(self, call_parser, bq_client call_parser.return_value = ('SUCCESS', '') - response = etl.load.main.etl_load(request) + response, status = etl.load.main.etl_load(request) + + # etl_load will fail as bbv/v1 is invalid parser + self.assertEqual(status, 500) self.assertDictEqual( response, { 'id': '1234567890', 'record': json.loads(ETL_SAMPLE_RECORD_2), - 'result': "''", - 'success': True, + 'result': '' + "'Missing or invalid sample_type: /bbv/v1 in the record with id: 1234567890'" + '', + 'success': False, }, ) @@ -119,9 +175,10 @@ async def test_etl_load_found_record_pubsub_payload(self, call_parser, bq_client request.get_json.return_value = pubsub_payload_example - query_row = MagicMock(args={}, spec=['body', 'type']) - query_row.body = ETL_SAMPLE_RECORD_2 + query_row = MagicMock(args={}, spec=['body', 'type', 'submitting_user']) + query_row.body = ETL_SAMPLE_RECORD_3 query_row.type = '/gmp/v1' + query_row.submitting_user = 'user@mail.com' query_job_result = MagicMock(args={}, spec=['__iter__', '__next__']) query_job_result.total_rows = 1 @@ -136,11 +193,13 @@ async def test_etl_load_found_record_pubsub_payload(self, call_parser, bq_client call_parser.return_value = ('SUCCESS', '') response = etl.load.main.etl_load(request) + # etl_load will fail as bbv/v1 is invalid parser + self.assertDictEqual( response, { 'id': '6dc4b9ae-74ee-42ee-9298-b0a51d5c6836', - 'record': json.loads(ETL_SAMPLE_RECORD_2), + 'record': json.loads(ETL_SAMPLE_RECORD_3), 'result': "''", 'success': True, }, diff --git a/metamist_infrastructure/driver.py b/metamist_infrastructure/driver.py index e1350cbb1..5ed322dca 100644 --- a/metamist_infrastructure/driver.py +++ b/metamist_infrastructure/driver.py @@ -13,11 +13,11 @@ from cpg_infra.plugin import CpgInfrastructurePlugin # from cpg_infra.utils import archive_folder -from cpg_utils.cloud import read_secret +from slack_notification import SlackNotification, SlackNotificationType # this gets moved around during the pip install -# ETL_FOLDER = Path(__file__).parent / 'etl' -ETL_FOLDER = Path(__file__).parent.parent / 'etl' +ETL_FOLDER = Path(__file__).parent / 'etl' +# ETL_FOLDER = Path(__file__).parent.parent / 'etl' PATH_TO_ETL_BQ_SCHEMA = ETL_FOLDER / 'bq_schema.json' PATH_TO_ETL_BQ_LOG_SCHEMA = ETL_FOLDER / 'bq_log_schema.json' @@ -253,6 +253,7 @@ def etl_pubsub_push_subscription(self): self.etl_pubsub_topic, self.etl_load_function, self.etl_pubsub_dead_letter_subscription, + self.etl_extract_service_account, ] ), ) @@ -340,32 +341,6 @@ def etl_bigquery_log_table(self): """ return self._setup_bq_table(PATH_TO_ETL_BQ_LOG_SCHEMA, 'logs') - @cached_property - def slack_channel(self): - """ - Create a Slack notification channel for all functions - Use cli command below to retrieve the required 'labels' - $ gcloud beta monitoring channel-descriptors describe slack - """ - if not self.config.sample_metadata.slack_channel: - return None - return gcp.monitoring.NotificationChannel( - 'metamist-etl-slack-notification-channel', - display_name='Metamist ETL Slack Notification Channel', - type='slack', - labels={'channel_name': self.config.sample_metadata.slack_channel}, - sensitive_labels=gcp.monitoring.NotificationChannelSensitiveLabelsArgs( - auth_token=read_secret( - # reuse this secret :) - project_id=self.config.billing.gcp.project_id, - secret_name=self.config.billing.aggregator.slack_token_secret_name, - fail_gracefully=False, - ), - ), - description='Slack notification channel for all cost aggregator functions', - project=self.config.sample_metadata.gcp.project, - ) - def prepare_service_account_policy_data(self, role): """ Prepare gcp service account policy, to be used in the pubsub subscription @@ -423,19 +398,41 @@ def setup_etl(self): ) # give the etl_extract_service_account ability to push to pub/sub gcp.projects.IAMMember( - 'metamist-etl-editor-role', + 'metamist-etl-extract-editor-role', project=self.config.sample_metadata.gcp.project, role='roles/editor', member=pulumi.Output.concat( 'serviceAccount:', self.etl_extract_service_account.email ), ) + # give the etl_load_service_account ability to push to pub/sub + gcp.projects.IAMMember( + 'metamist-etl-load-editor-role', + project=self.config.sample_metadata.gcp.project, + role='roles/editor', + member=pulumi.Output.concat( + 'serviceAccount:', self.etl_load_service_account.email + ), + ) + + # give account serverless-robot-prod.iam.gserviceaccount.com ability to setup cloud service + project = gcp.organizations.get_project() + robot_account = pulumi.Output.concat( + 'serviceAccount:service-', + project.number, + '@serverless-robot-prod.iam.gserviceaccount.com', + ) + gcp.projects.IAMMember( + 'metamist-etl-robot-service-agent-role', + project=self.config.sample_metadata.gcp.project, + role='roles/run.serviceAgent', + member=robot_account, + ) self._setup_etl_functions() self._setup_etl_pubsub() self._setup_metamist_etl_accessors() - self._setup_slack_notification() def _setup_etl_functions(self): """ @@ -460,14 +457,14 @@ def _setup_etl_pubsub(self): @cached_property def etl_extract_function(self): """etl_extract_function""" - return self._etl_function('extract', self.etl_extract_service_account.email) + return self._etl_function('extract', self.etl_extract_service_account) @cached_property def etl_load_function(self): """etl_load_function""" - return self._etl_function('load', self.etl_load_service_account.email) + return self._etl_function('load', self.etl_load_service_account) - def _etl_function(self, f_name: str, sa_email: str): + def _etl_function(self, f_name: str, sa: object): """ Driver function to setup the etl cloud function """ @@ -509,7 +506,7 @@ def _etl_function(self, f_name: str, sa_email: str): ), ), service_config=gcp.cloudfunctionsv2.FunctionServiceConfigArgs( - max_instance_count=1, + max_instance_count=1, # Keep max instances to 1 to avoid racing conditions min_instance_count=0, available_memory='2Gi', available_cpu=1, @@ -531,16 +528,24 @@ def _etl_function(self, f_name: str, sa_email: str): self.etl_bigquery_log_table.table_id, ), 'PUBSUB_TOPIC': self.etl_pubsub_topic.id, + 'NOTIFICATION_PUBSUB_TOPIC': self.etl_slack_notification_topic.id + if self.etl_slack_notification_topic + else '', 'SM_ENVIRONMENT': 'DEVELOPMENT', # TODO: make it configurable }, ingress_settings='ALLOW_ALL', all_traffic_on_latest_revision=True, - service_account_email=sa_email, + service_account_email=sa.email, ), project=self.config.sample_metadata.gcp.project, location=self.config.gcp.region, opts=pulumi.ResourceOptions( - depends_on=[self._svc_functions, self._svc_build] + depends_on=[ + self._svc_functions, + self._svc_build, + sa, + self.etl_slack_notification_topic, + ] ), ) @@ -566,52 +571,50 @@ def _setup_metamist_etl_accessors(self): member=pulumi.Output.concat('serviceAccount:', sa.email), ) - def _setup_function_slack_notification(self, etl_fun_name: str): + @cached_property + def etl_slack_notification(self): """ - setup slack notification for etl_fun cloud function + Setup Slack notification """ - etl_fun = getattr(self, f'etl_{etl_fun_name}_function') - - # Slack notifications - filter_string = etl_fun.name.apply( - lambda fxn_name: f""" - resource.type="cloud_run_revision" - AND resource.labels.service_name="{fxn_name}" - AND severity>=WARNING - """ # noqa: B028 - ) - # Create the Cloud Function's event alert - alert_condition = gcp.monitoring.AlertPolicyConditionArgs( - condition_matched_log=( - gcp.monitoring.AlertPolicyConditionConditionMatchedLogArgs( - filter=filter_string, - ) - ), - display_name='Function warning/error', - ) - gcp.monitoring.AlertPolicy( - resource_name=f'metamist-etl-{etl_fun_name}-alert-policy', - display_name=f'Metamist ETL {etl_fun_name.capitalize()} Function Error Alert', - combiner='OR', - notification_channels=[self.slack_channel.id], - conditions=[alert_condition], - alert_strategy=gcp.monitoring.AlertPolicyAlertStrategyArgs( - notification_rate_limit=( - gcp.monitoring.AlertPolicyAlertStrategyNotificationRateLimitArgs( - # One notification per 5 minutes - period='300s' - ) - ), - # Autoclose Incident after 30 minutes - auto_close='1800s', - ), - opts=pulumi.ResourceOptions(depends_on=[etl_fun]), + project = gcp.organizations.get_project() + + notification = SlackNotification( + project_name=self.config.sample_metadata.gcp.project, + project_number=project.number, + location=self.config.gcp.region, + service_account=self.etl_service_account, # can be some other account + source_bucket=self.source_bucket, + topic_name='metamist-etl-notification', + slack_secret_project_id=self.config.billing.gcp.project_id, + slack_token_secret_name=self.config.billing.aggregator.slack_token_secret_name, + slack_channel_name=self.config.sample_metadata.slack_channel, + func_to_monitor=[ + 'metamist-etl-notification-func', + 'metamist-etl-extract', + 'metamist-etl-load', + ], + notification_type=SlackNotificationType.NOTIFICATION, + depends_on=[ + self._svc_iam, + self._svc_functions, + self._svc_build, + self._svc_pubsub, + self.etl_service_account, + ], ) - def _setup_slack_notification(self): - if self.slack_channel is None: - return + # setup notification + return notification.main() + + @cached_property + def etl_slack_notification_channel(self): + """etl_slack_notification_channel""" + (alerts_channel, _) = self.etl_slack_notification + return alerts_channel - self._setup_function_slack_notification('extract') - self._setup_function_slack_notification('load') + @cached_property + def etl_slack_notification_topic(self): + """etl_slack_notification_topic""" + (_, pubsub_topic) = self.etl_slack_notification + return pubsub_topic diff --git a/metamist_infrastructure/setup.py b/metamist_infrastructure/setup.py index 116e27860..3a231821c 100644 --- a/metamist_infrastructure/setup.py +++ b/metamist_infrastructure/setup.py @@ -26,6 +26,7 @@ 'metamist_infrastructure.etl', 'metamist_infrastructure.etl.extract', 'metamist_infrastructure.etl.load', + 'metamist_infrastructure.etl.notification', ], package_dir={ # files in THIS directory are included as metamist_infrastructure @@ -36,6 +37,8 @@ 'metamist_infrastructure.etl.extract': '../etl/extract', # files in ../etl/load are included as metamist_infrastructure.etl.load 'metamist_infrastructure.etl.load': '../etl/load', + # files in ../etl/notification are included as metamist_infrastructure.etl.notification + 'metamist_infrastructure.etl.notification': '../etl/notification', }, package_data={ # ensure bq_schema.json is included in etl @@ -43,7 +46,9 @@ # ensure requirements.txt is included in etl.extract 'metamist_infrastructure.etl.extract': ['*.txt'], # ensure requirements.txt is included in etl.load - 'metamist_infrastructure.etl.load': ['*.txt'], + 'metamist_infrastructure.etl.load': ['*.txt', '*.tar.gz'], + # ensure requirements.txt is included in etl.notification + 'metamist_infrastructure.etl.notification': ['*.txt'], }, install_requires=[], entry_points={ diff --git a/metamist_infrastructure/slack_notification.py b/metamist_infrastructure/slack_notification.py new file mode 100644 index 000000000..5e6322a67 --- /dev/null +++ b/metamist_infrastructure/slack_notification.py @@ -0,0 +1,360 @@ +# pylint: disable=missing-function-docstring,import-error +""" +Make metamist architecture available to production pulumi stack +so it can be centrally deployed. Do this through a plugin, and submodule. +""" +from enum import Enum +from functools import cached_property +from pathlib import Path + +import pulumi +import pulumi_gcp as gcp +from cpg_infra.utils import archive_folder +from cpg_utils.cloud import read_secret + +# this gets moved around during the pip install +# ETL_FOLDER = Path(__file__).parent / 'etl' +ETL_FOLDER = Path(__file__).parent.parent / 'etl' +PATH_TO_ETL_NOTIFICATION = ETL_FOLDER / 'notification' + + +def prepare_policy_data(role, members): + """ + Prepare policy policy for pub/sub subscription and topic + """ + return gcp.organizations.get_iam_policy( + bindings=[gcp.organizations.GetIAMPolicyBindingArgs(role=role, members=members)] + ).policy_data + + +class SlackNotificationType(Enum): + """ + Enum type to distinguish between alert and notification: + + INCIDENT_ALERT - is a notification that requires action, + very generic GC Monitorin alert, which batch multiple incidents/errors into one slack message + It monitors logs for errors/warnings and sends notification to slack channel + "resources_to_monitor" is a list of resource types to monitor for errors/warnings + + NOTIFICATION - is a notification that sends details about the event to the slack channel + more granualar and with more details, it requires error messages to be pushed to pubsub topic. + It monitors pubsub topic for new messages and sends notification to slack channel + "topic_name" is the name of the pubsub topic to monitor for new messages + + BOTH - setup both alert and notification + """ + + INCIDENT_ALERT = 1 + NOTIFICATION = 2 + BOTH = 3 + + +class SlackNotification: + """ + Metamist Infrastructure Notification Slack Plugin + """ + + def __init__( + self, + project_name: str, + project_number: str, + location: str, # e.g. self.config.gcp.region + service_account: object, + source_bucket: object, + topic_name: str, # e.g. 'metamist-etl-notification' + slack_secret_project_id: str, + slack_token_secret_name: str, + slack_channel_name: str, + func_to_monitor: list | None, + notification_type: SlackNotificationType, + depends_on: list | None, + ): + """Slack notification constructor""" + self.project_name = project_name + self.project_number = project_number + self.location = location + self.service_account = service_account + self.source_bucket = source_bucket + self.topic_name = topic_name + + self.slack_secret_project_id = slack_secret_project_id + self.slack_token_secret_name = slack_token_secret_name + self.slack_channel_name = slack_channel_name + + self.func_to_monitor = func_to_monitor + self.notification_type = notification_type + self.depends_on = depends_on + + def _setup_notification_permissions(self): + """Setup permissions for the service account""" + gcp.projects.IAMMember( + f'{self.topic_name}-sa-run-invoker-role', + project=self.project_name, + role='roles/run.invoker', + member=pulumi.Output.concat('serviceAccount:', self.service_account.email), + ) + + def _incident_setup_alerts_slack_notification(self): + """ + setup slack notification for etl_fun in self.logs_keywords_to_monitor + """ + for func_name in self.func_to_monitor: + # Slack notifications + + filter_string = f""" + resource.type="cloud_run_revision" + AND resource.labels.service_name="{func_name}" + AND severity>=WARNING + """ + + # Create the Cloud Function's event alert + alert_condition = gcp.monitoring.AlertPolicyConditionArgs( + condition_matched_log=( + gcp.monitoring.AlertPolicyConditionConditionMatchedLogArgs( + filter=filter_string, + ) + ), + display_name=f'Warning/error for {self.topic_name} {func_name}', + ) + gcp.monitoring.AlertPolicy( + resource_name=f'{self.topic_name}-{func_name}-alert-policy', + display_name=f'{self.topic_name.capitalize()} {func_name} Error Alert', + combiner='OR', + notification_channels=[self.incident_alerts_channel.id], + conditions=[alert_condition], + alert_strategy=gcp.monitoring.AlertPolicyAlertStrategyArgs( + notification_rate_limit=( + gcp.monitoring.AlertPolicyAlertStrategyNotificationRateLimitArgs( + # One notification per 5 minutes + period='300s' + ) + ), + # Autoclose Incident after 30 minutes + auto_close='1800s', + ), + ) + + @cached_property + def notification_cloudfun(self): + """ + Driver function to setup the etl cloud function + """ + + # The Cloud Function source code itself needs to be zipped up into an + # archive, which we create using the pulumi.AssetArchive primitive. + archive = archive_folder( + str(PATH_TO_ETL_NOTIFICATION.absolute()), + allowed_extensions=frozenset({'.gz', '.py', '.txt', '.json'}), + ) + + # Create the single Cloud Storage object, + # which contains the source code + source_archive_object = gcp.storage.BucketObject( + f'{self.topic_name}-func-source-code', + # updating the source archive object does not trigger the cloud + # function to actually updating the source because + # it's based on the name, + # allow Pulumi to create a new name each time it gets updated + bucket=self.source_bucket.name, + source=archive, + opts=pulumi.ResourceOptions(replace_on_changes=['*']), + ) + + fxn = gcp.cloudfunctionsv2.Function( + f'{self.topic_name}-func', + name=f'{self.topic_name}-func', + build_config=gcp.cloudfunctionsv2.FunctionBuildConfigArgs( + runtime='python311', + entry_point=f'etl_notify', + environment_variables={}, + source=gcp.cloudfunctionsv2.FunctionBuildConfigSourceArgs( + storage_source=gcp.cloudfunctionsv2.FunctionBuildConfigSourceStorageSourceArgs( + bucket=self.source_bucket.name, + object=source_archive_object.name, + ), + ), + ), + service_config=gcp.cloudfunctionsv2.FunctionServiceConfigArgs( + max_instance_count=1, # Keep max instances to 1 to avoid racing conditions + min_instance_count=0, + available_memory='2Gi', + available_cpu=1, + timeout_seconds=540, + environment_variables={ + 'SLACK_BOT_TOKEN': read_secret( + # reuse this secret :) + project_id=self.slack_secret_project_id, + secret_name=self.slack_token_secret_name, + fail_gracefully=False, + ), + 'SLACK_CHANNEL': self.slack_channel_name, + }, + ingress_settings='ALLOW_ALL', + all_traffic_on_latest_revision=True, + service_account_email=self.service_account.email, + ), + project=self.project_name, + location=self.location, + opts=pulumi.ResourceOptions(depends_on=self.depends_on), + ) + return fxn + + @cached_property + def incident_alerts_channel(self): + """ + Create a Slack notification channel for all functions + Use cli command below to retrieve the required 'labels' + $ gcloud beta monitoring channel-descriptors describe slack + """ + return gcp.monitoring.NotificationChannel( + f'{self.topic_name}-incidents-channel', + display_name=f'{self.topic_name} incidents slack notification channel', + type='slack', + labels={'channel_name': self.slack_channel_name}, + sensitive_labels=gcp.monitoring.NotificationChannelSensitiveLabelsArgs( + auth_token=read_secret( + # reuse this secret :) + project_id=self.slack_secret_project_id, + secret_name=self.slack_token_secret_name, + fail_gracefully=False, + ), + ), + description=f'Slack notification channel for {self.topic_name}', + project=self.project_name, + ) + + @cached_property + def notification_pubsub_topic(self): + """ + Pubsub topic to trigger send notification message to slack + """ + return gcp.pubsub.Topic(self.topic_name, project=self.project_name) + + @cached_property + def notification_pubsub_push_subscription(self): + """ + Pubsub push_subscription to topic, + new messages to topic triggeres slack notification function + """ + subscription = gcp.pubsub.Subscription( + f'{self.topic_name}-subscription', + topic=self.notification_pubsub_topic.name, + ack_deadline_seconds=20, + dead_letter_policy=gcp.pubsub.SubscriptionDeadLetterPolicyArgs( + dead_letter_topic=self.notification_dead_letters_pubsub_topic.id, + max_delivery_attempts=5, + ), + push_config=gcp.pubsub.SubscriptionPushConfigArgs( + push_endpoint=self.notification_cloudfun.service_config.uri, + oidc_token=gcp.pubsub.SubscriptionPushConfigOidcTokenArgs( + service_account_email=self.service_account.email, + ), + attributes={ + 'x-goog-version': 'v1', + }, + ), + project=self.project_name, + opts=pulumi.ResourceOptions( + depends_on=[ + self.notification_pubsub_topic, + self.notification_cloudfun, + self.notification_dead_letters_pubsub_topic, + ] + ), + ) + + # give subscriber permission to service account + members = [ + pulumi.Output.concat( + 'serviceAccount:service-', + self.project_number, + '@gcp-sa-pubsub.iam.gserviceaccount.com', + ) + ] + + # give publisher permission to service account + gcp.pubsub.SubscriptionIAMPolicy( + f'{self.topic_name}-subscription-iam-policy', + project=self.project_name, + subscription=subscription.name, + policy_data=prepare_policy_data('roles/pubsub.subscriber', members), + ) + + return subscription + + @cached_property + def notification_dead_letters_pubsub_topic(self): + """ + Dead letters pubsub topic to capture failed jobs + """ + topic = gcp.pubsub.Topic( + f'{self.topic_name}-dead-letters-topic', project=self.project_name + ) + + members = [ + pulumi.Output.concat( + 'serviceAccount:service-', + self.project_number, + '@gcp-sa-pubsub.iam.gserviceaccount.com', + ) + ] + + # give publisher permission to service account + gcp.pubsub.TopicIAMPolicy( + f'{self.topic_name}-dead-letters-topic-iam-policy', + project=self.project_name, + topic=topic.name, + policy_data=prepare_policy_data('roles/pubsub.publisher', members), + ) + + return topic + + @cached_property + def notification_dead_letters_pubsub_subscription(self): + """ + Dead letter subscription + """ + return gcp.pubsub.Subscription( + f'{self.topic_name}-dead-letters-subscription', + topic=self.dead_letters_pubsub_topic.name, + project=self.project_name, + ack_deadline_seconds=20, + opts=pulumi.ResourceOptions( + depends_on=[self.notification_dead_letters_pubsub_topic] + ), + ) + + def setup_notification(self): + """Setup notification, send notification to slack channel + This notification has more details customising the message than the generic gcp alerts + """ + self._setup_notification_permissions() + + # now hook service into TOPIC as subscription, depends on self.notification_cloudfun + return self.notification_pubsub_push_subscription + + def setup_incident_alerts_channel(self): + """Setup monitoring alerts, monitor logs for errors, batch them and report to slack channel + Action has to be taken if there are errors in the logs + """ + self._incident_setup_alerts_slack_notification() + return self.incident_alerts_channel.name + + def main(self): + """Main function to setup notification infrastructure""" + alerts_channel = None + pubsub_topic = None + if ( + self.notification_type == SlackNotificationType.INCIDENT_ALERT + or self.notification_type == SlackNotificationType.BOTH + ): + alerts_channel = self.setup_incident_alerts_channel() + + if ( + self.notification_type == SlackNotificationType.NOTIFICATION + or self.notification_type == SlackNotificationType.BOTH + ): + self.setup_notification() + pubsub_topic = self.notification_pubsub_topic + + return (alerts_channel, pubsub_topic)