Skip to content

Commit

Permalink
Fixed linting issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
milo-hyben committed Sep 13, 2023
1 parent d2a2e9f commit ebbba83
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 129 deletions.
143 changes: 77 additions & 66 deletions etl/load/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
import functions_framework
import google.cloud.bigquery as bq

from google.cloud import pubsub_v1

# import all public parsers
import metamist.parser as mp
from google.cloud import pubsub_v1

# try to import private parsers
# try:
Expand All @@ -27,14 +28,8 @@


def call_parser(parser_obj, row_json):
"""_summary_
Args:
parser_obj (_type_): _description_
row_json (_type_): _description_
Returns:
_type_: _description_
"""
This function calls parser_obj.from_json and returns status and result
"""
tmp_res = []
tmp_status = []
Expand All @@ -48,7 +43,7 @@ async def run_parser_capture_result(parser_obj, row_data, res, status):
res.append(r)
status.append('SUCCESS')
except Exception as e: # pylint: disable=broad-exception-caught
logging.error(f'Failed to parse the row {e}')
logging.error(f'Failed to parse: {e}')
# add to the output
res.append(e)
status.append('FAILED')
Expand All @@ -57,6 +52,70 @@ async def run_parser_capture_result(parser_obj, row_data, res, status):
return tmp_status[0], tmp_res[0]


def process_rows(query_job_result, delivery_attempt, request_id, parser_map, bq_client):
"""
Process BQ results rows, should be only one row
"""
status = None
parsing_result = None
row_json = None

for row in query_job_result:
sample_type = row.type
# sample_type should be in the format /ParserName/Version e.g.: /bbv/v1

row_json = json.loads(row.body)
# get config from payload or use default
config_data = row_json.get('config')
# get data from payload or use payload as data
record_data = row_json.get('data', row_json)

parser_obj = get_parser_instance(parser_map, sample_type, config_data)
if parser_obj:
# Parse row.body -> Model and upload to metamist database
status, parsing_result = call_parser(parser_obj, record_data)
else:
status = 'FAILED'
parsing_result = f'Missing or invalid sample_type: {sample_type} in the record with id: {request_id}'

if delivery_attempt == 1:
# log only at the first attempt
log_details = {
'type': sample_type,
'submitting_user': row.submitting_user,
'result': f'{parsing_result}',
}

# log results to BIGQUERY_LOG_TABLE
log_record = {
'request_id': request_id,
'timestamp': datetime.datetime.utcnow().isoformat(),
'status': status,
'details': json.dumps(log_details),
}

errors = bq_client.insert_rows_json(
BIGQUERY_LOG_TABLE,
[log_record],
)
if errors:
logging.error(f'Failed to log to BQ: {errors}')

if status == 'FAILED':
# publish to notification pubsub
msg_title = 'Metamist ETL Load Failed'
try:
pubsub_client = pubsub_v1.PublisherClient()
pubsub_client.publish(
NOTIFICATION_PUBSUB_TOPIC,
json.dumps({'title': msg_title} | log_record).encode(),
)
except Exception as e: # pylint: disable=broad-exception-caught
logging.error(f'Failed to publish to pubsub: {e}')

return status, parsing_result, row_json


@functions_framework.http
def etl_load(request: flask.Request):
"""HTTP Cloud Function for ETL loading records from BQ to MySQL DB
Expand Down Expand Up @@ -138,72 +197,24 @@ def etl_load(request: flask.Request):
}, 412 # Precondition Failed

# should be only one record, look into loading multiple objects in one call?
row_json = None
result = None
status = None
log_record = None
for row in query_job_result:
sample_type = row.type
# sample_type should be in the format /ParserName/Version e.g.: /bbv/v1

row_json = json.loads(row.body)
# get config from payload or use default
config_data = row_json.get('config')
# get data from payload or use payload as data
record_data = row_json.get('data', row_json)

parser_obj = get_parser_instance(parser_map, sample_type, config_data)
if parser_obj:
# Parse row.body -> Model and upload to metamist database
status, result = call_parser(parser_obj, record_data)
else:
status = 'FAILED'
result = f'Missing or invalid sample_type: {sample_type} in the record with id: {request_id}'

if delivery_attempt == 1:
# log results to BIGQUERY_LOG_TABLE
log_record = {
'request_id': request_id,
'timestamp': datetime.datetime.utcnow().isoformat(),
'status': status,
'details': {
'type': sample_type,
'submitting_user': row.submitting_user,
'result': f"'{result}'", # convert to string
}
# json.dumps({'result': f"'{result}'"}),
}
bq_client.insert_rows_json(
BIGQUERY_LOG_TABLE,
[log_record],
)

if status == 'FAILED':
# publish to notification pubsub
msg_title = 'Metamist ETL Load Failed'
try:
pubsub_client = pubsub_v1.PublisherClient()
pubsub_client.publish(
NOTIFICATION_PUBSUB_TOPIC,
json.dumps({'title': msg_title} | log_record).encode(),
)
except Exception as e: # pylint: disable=broad-exception-caught
logging.error(f'Failed to publish to pubsub: {e}')
(status, parsing_result, last_record) = process_rows(
query_job_result, delivery_attempt, request_id, parser_map, bq_client
)

# return success
if status and status == 'SUCCESS':
return {
'id': request_id,
'record': row_json,
'result': f"'{result}'",
'record': last_record,
'result': f'{parsing_result}',
'success': True,
}

# return error
# otherwise return error
return {
'id': request_id,
'record': row_json,
'result': f"'{result}'",
'record': last_record,
'result': f'{parsing_result}',
'success': False,
}, 500

Expand Down
6 changes: 2 additions & 4 deletions etl/notification/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,12 @@ def etl_notify(request: flask.Request):
}, 400

# TODO: format message to slack message
print(type(message), message)
message_blocks = format_slack(message)
print("message_blocks", message_blocks)
success = None
try:
client = WebClient(token=SLACK_BOT_TOKEN)
response = client.chat_postMessage(channel=SLACK_CHANNEL, blocks=message_blocks)
success = response.get('ok') == True
success = response.get('ok') is True
except SlackApiError as e:
return {
'success': False,
Expand Down Expand Up @@ -93,7 +91,7 @@ def format_slack(message: Dict[str, Any]) -> Any | None:
if key in ['title', 'status']:
continue
message_sections.append(
{"type": "section", "text": {"type": "mrkdwn", "text": f"*{key}*: {value}"}}
{'type': 'section', 'text': {'type': 'mrkdwn', 'text': f'*{key}*: {value}'}}
)

return message_sections
Expand Down
2 changes: 1 addition & 1 deletion etl/notification/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ functions_framework
google-cloud-bigquery
google-cloud-logging
google-cloud-pubsub
slack_sdk
slack_sdk
6 changes: 2 additions & 4 deletions etl/test/test_etl_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,7 @@ async def test_etl_load_found_record_simple_payload(self, call_parser, bq_client
{
'id': '1234567890',
'record': json.loads(ETL_SAMPLE_RECORD_2),
'result': ''
"'Missing or invalid sample_type: /bbv/v1 in the record with id: 1234567890'"
'',
'result': 'Missing or invalid sample_type: /bbv/v1 in the record with id: 1234567890',
'success': False,
},
)
Expand Down Expand Up @@ -200,7 +198,7 @@ async def test_etl_load_found_record_pubsub_payload(self, call_parser, bq_client
{
'id': '6dc4b9ae-74ee-42ee-9298-b0a51d5c6836',
'record': json.loads(ETL_SAMPLE_RECORD_3),
'result': "''",
'result': '',
'success': True,
},
)
Expand Down
27 changes: 16 additions & 11 deletions metamist_infrastructure/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
from cpg_infra.plugin import CpgInfrastructurePlugin

# from cpg_infra.utils import archive_folder
from slack_notification import SlackNotification, SlackNotificationType
from slack_notification import (
SlackNotification,
SlackNotificationType,
SlackNotificationConfig,
)

# this gets moved around during the pip install
ETL_FOLDER = Path(__file__).parent / 'etl'
Expand Down Expand Up @@ -57,7 +61,7 @@ def main(self):
"""Driver for the metamist infrastructure as code plugin"""
# todo, eventually configure metamist cloud run server
# to be deployed here, but for now it's manually deployed
self.setup_etl()
self._setup_etl()

@cached_property
def _svc_cloudresourcemanager(self):
Expand Down Expand Up @@ -151,7 +155,7 @@ def etl_service_account(self):
),
)

def etl_function_account(self, f_name: str):
def _etl_function_account(self, f_name: str):
"""
Service account for cloud function
"""
Expand All @@ -167,12 +171,12 @@ def etl_function_account(self, f_name: str):
@cached_property
def etl_load_service_account(self):
"""Service account to run load/transform functionality"""
return self.etl_function_account('load')
return self._etl_function_account('load')

@cached_property
def etl_extract_service_account(self):
"""Service account to run extract functionality"""
return self.etl_function_account('extract')
return self._etl_function_account('extract')

@cached_property
def etl_accessors(self):
Expand Down Expand Up @@ -368,7 +372,7 @@ def prepare_service_account_policy_data(self, role):
]
).policy_data

def setup_etl(self):
def _setup_etl(self):
"""
setup_etl
"""
Expand Down Expand Up @@ -577,18 +581,19 @@ def etl_slack_notification(self):
Setup Slack notification
"""

project = gcp.organizations.get_project()

notification = SlackNotification(
slack_config = SlackNotificationConfig(
project_name=self.config.sample_metadata.gcp.project,
project_number=project.number,
location=self.config.gcp.region,
service_account=self.etl_service_account, # can be some other account
source_bucket=self.source_bucket,
topic_name='metamist-etl-notification',
slack_secret_project_id=self.config.billing.gcp.project_id,
slack_token_secret_name=self.config.billing.aggregator.slack_token_secret_name,
slack_channel_name=self.config.sample_metadata.slack_channel,
)

notification = SlackNotification(
slack_config=slack_config,
topic_name='metamist-etl-notification',
func_to_monitor=[
'metamist-etl-notification-func',
'metamist-etl-extract',
Expand Down
Loading

0 comments on commit ebbba83

Please sign in to comment.