Skip to content

Commit

Permalink
Updade ETL load to load available parser dynamically, fully implement…
Browse files Browse the repository at this point in the history
…ed detailed Slack messaging.
  • Loading branch information
milo-hyben committed Sep 13, 2023
1 parent 35100fb commit d2a2e9f
Show file tree
Hide file tree
Showing 9 changed files with 717 additions and 139 deletions.
109 changes: 61 additions & 48 deletions etl/load/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,21 @@
import flask
import functions_framework
import google.cloud.bigquery as bq

# import all public parsers
import metamist.parser as mp
from google.cloud import pubsub_v1

# try to import private parsers
# try:
# import metamist_private.parser as mpp
# except ImportError:
# pass


BIGQUERY_TABLE = os.getenv('BIGQUERY_TABLE')
BIGQUERY_LOG_TABLE = os.getenv('BIGQUERY_LOG_TABLE')

DEFAULT_PARSER_PARAMS = {
'search_locations': [],
'project': 'milo-dev',
'participant_column': 'individual_id',
'sample_name_column': 'sample_id',
'seq_type_column': 'sequencing_type',
'default_sequencing_type': 'sequencing_type',
'default_sample_type': 'blood',
'default_sequencing_technology': 'short-read',
'sample_meta_map': {
'collection_centre': 'centre',
'collection_date': 'collection_date',
'collection_specimen': 'specimen',
},
'participant_meta_map': {},
'assay_meta_map': {},
'qc_meta_map': {},
}
NOTIFICATION_PUBSUB_TOPIC = os.getenv('NOTIFICATION_PUBSUB_TOPIC')


def call_parser(parser_obj, row_json):
Expand Down Expand Up @@ -108,7 +100,9 @@ def etl_load(request: flask.Request):
# try to force conversion and if fails just return None
jbody = request.get_json(force=True, silent=True)

request_id = extract_request_id(jbody)
# use delivery_attempt to only log error once, min number of attempts for pub/sub is 5
# so we want to avoid 5 records / slack messages to be passed around per one error
delivery_attempt, request_id = extract_request_id(jbody)
if not request_id:
jbody_str = json.dumps(jbody)
return {
Expand Down Expand Up @@ -147,39 +141,56 @@ def etl_load(request: flask.Request):
row_json = None
result = None
status = None
log_record = None
for row in query_job_result:
sample_type = row.type
# sample_type should be in the format /ParserName/Version e.g.: /bbv/v1

row_json = json.loads(row.body)
# get config from payload or use default
config_data = row_json.get('config', DEFAULT_PARSER_PARAMS)
config_data = row_json.get('config')
# get data from payload or use payload as data
record_data = row_json.get('data', row_json)

parser_obj = get_parser_instance(parser_map, sample_type, config_data)
if not parser_obj:
return {
'success': False,
'message': f'Missing or invalid sample_type: {sample_type} in the record with id: {request_id}',
}, 412 # Precondition Failed

# Parse row.body -> Model and upload to metamist database
status, result = call_parser(parser_obj, record_data)

# log results to BIGQUERY_LOG_TABLE
bq_client.insert_rows_json(
BIGQUERY_LOG_TABLE,
[
{
'request_id': request_id,
'timestamp': datetime.datetime.utcnow().isoformat(),
'status': status,
'details': json.dumps({'result': f"'{result}'"}),
if parser_obj:
# Parse row.body -> Model and upload to metamist database
status, result = call_parser(parser_obj, record_data)
else:
status = 'FAILED'
result = f'Missing or invalid sample_type: {sample_type} in the record with id: {request_id}'

if delivery_attempt == 1:
# log results to BIGQUERY_LOG_TABLE
log_record = {
'request_id': request_id,
'timestamp': datetime.datetime.utcnow().isoformat(),
'status': status,
'details': {
'type': sample_type,
'submitting_user': row.submitting_user,
'result': f"'{result}'", # convert to string
}
],
)

# json.dumps({'result': f"'{result}'"}),
}
bq_client.insert_rows_json(
BIGQUERY_LOG_TABLE,
[log_record],
)

if status == 'FAILED':
# publish to notification pubsub
msg_title = 'Metamist ETL Load Failed'
try:
pubsub_client = pubsub_v1.PublisherClient()
pubsub_client.publish(
NOTIFICATION_PUBSUB_TOPIC,
json.dumps({'title': msg_title} | log_record).encode(),
)
except Exception as e: # pylint: disable=broad-exception-caught
logging.error(f'Failed to publish to pubsub: {e}')

# return success
if status and status == 'SUCCESS':
return {
'id': request_id,
Expand All @@ -188,7 +199,7 @@ def etl_load(request: flask.Request):
'success': True,
}

# some error happened
# return error
return {
'id': request_id,
'record': row_json,
Expand All @@ -207,20 +218,22 @@ def extract_request_id(jbody: Dict[str, Any]) -> str | None:
str | None: ID of object to be loaded
"""
if not jbody:
return None
return None, None

request_id = jbody.get('request_id')
# set default delivery_attempt as 1
delivery_attempt = jbody.get('deliveryAttempt', 1)
if request_id:
return request_id
return delivery_attempt, request_id

# try if payload is from pub/sub function
message = jbody.get('message')
if not message:
return None
return delivery_attempt, None

data = message.get('data')
if not data:
return None
return delivery_attempt, None

# data is not empty, decode and extract request_id
request_id = None
Expand All @@ -231,7 +244,7 @@ def extract_request_id(jbody: Dict[str, Any]) -> str | None:
except Exception as e: # pylint: disable=broad-exception-caught
logging.error(f'Failed to extract request_id from the payload {e}')

return request_id
return delivery_attempt, request_id


def get_parser_instance(
Expand Down
1 change: 1 addition & 0 deletions etl/load/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ flask
functions_framework
google-cloud-bigquery
google-cloud-logging
google-cloud-pubsub
# will be replaced with metamist once it cotains the parser changes
./metamist-6.2.0.tar.gz
Empty file added etl/notification/__init__.py
Empty file.
131 changes: 131 additions & 0 deletions etl/notification/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import base64
import json
import logging
import os
from typing import Any, Dict

import flask
import functions_framework
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

SLACK_BOT_TOKEN = os.getenv('SLACK_BOT_TOKEN')
SLACK_CHANNEL = os.getenv('SLACK_CHANNEL')


@functions_framework.http
def etl_notify(request: flask.Request):
"""HTTP Cloud Function for Sending notification to slack channel
This cloud function is setup as subscriber to notification pub/sub topic
Payload message is expected to be in json format and it is passed to slack channel as is
TODO: We will add more formatting as we go along
Args:
request (flask.Request): The request object.
<https://flask.palletsprojects.com/en/1.1.x/api/#incoming-request-data>
Returns:
The response text, or any set of values that can be turned into a
Response object using `make_response`
<https://flask.palletsprojects.com/en/1.1.x/api/#flask.make_response>.
Note:
For more information on how Flask integrates with Cloud
Functions, see the `Writing HTTP functions` page.
<https://cloud.google.com/functions/docs/writing/http#http_frameworks>
"""

auth = request.authorization
if not auth or not auth.token:
return {'success': False, 'message': 'No auth token provided'}, 401

# if mimetype might not be set esp. when PubSub pushing from another topic,
# try to force conversion and if fails just return None
jbody = request.get_json(force=True, silent=True)

message = decode_message(jbody)
if not message:
jbody_str = json.dumps(jbody)
return {
'success': False,
'message': f'Missing or empty message: {jbody_str}',
}, 400

# TODO: format message to slack message
print(type(message), message)
message_blocks = format_slack(message)
print("message_blocks", message_blocks)
success = None
try:
client = WebClient(token=SLACK_BOT_TOKEN)
response = client.chat_postMessage(channel=SLACK_CHANNEL, blocks=message_blocks)
success = response.get('ok') == True
except SlackApiError as e:
return {
'success': False,
'message': f'Got an error: {e.response["error"]}',
}, 500

if success:
return {'success': True, 'message': 'Message sent'}

return {'success': False, 'message': 'Failed to send message'}, 500


def format_slack(message: Dict[str, Any]) -> Any | None:
"""
Basic Slack message formatting
Message is json file, for time being we pass all the keys to the message
If title present it will be used as title of the message
"""
message_sections = []

if 'title' in message:
message_sections.append(
{
'type': 'header',
'text': {'type': 'plain_text', 'text': f'{message.get("title")}'},
}
)
message_sections.append({'type': 'divider'})

for key, value in message.items():
if key in ['title', 'status']:
continue
message_sections.append(
{"type": "section", "text": {"type": "mrkdwn", "text": f"*{key}*: {value}"}}
)

return message_sections


def decode_message(jbody: Dict[str, Any]) -> Any | None:
"""Decode the message from payload
Args:
jbody (Dict[str, Any]): Json body of payload
Returns:
str | None: Decoded message
"""
if not jbody:
return None

# try if payload is from pub/sub function
message = jbody.get('message')
if not message:
return None

data = message.get('data')
if not data:
return None

# data is not empty, decode and extract request_id
data_json = None
try:
data_decoded = base64.b64decode(data)
data_json = json.loads(data_decoded)
except Exception as e: # pylint: disable=broad-exception-caught
logging.error(f'Failed to extract request_id from the payload {e}')

return data_json
6 changes: 6 additions & 0 deletions etl/notification/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
flask
functions_framework
google-cloud-bigquery
google-cloud-logging
google-cloud-pubsub
slack_sdk
Loading

0 comments on commit d2a2e9f

Please sign in to comment.