Skip to content

Commit

Permalink
Actioned pull request feedback.
Browse files Browse the repository at this point in the history
  • Loading branch information
milo-hyben committed Sep 20, 2023
1 parent 4db8f9b commit d884b13
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 137 deletions.
17 changes: 10 additions & 7 deletions etl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ Please use your personal dev project as `$PROJECT_NAME`.
# setup gcloud authentication
gcloud auth application-default login

export PROJECT_NAME='gcp-project-name'
export BIGQUERY_TABLE='$PROJECT_NAME.metamist.etl-data'
export BIGQUERY_LOG_TABLE='$PROJECT_NAME.metamist.etl-logs'
export PUBSUB_TOPIC='projects/$PROJECT_NAME/topics/etl-topic'
export PROJECT_NAME="gcp-project-name"
export BIGQUERY_TABLE="$PROJECT_NAME.metamist.etl-data"
export BIGQUERY_LOG_TABLE="$PROJECT_NAME.metamist.etl-logs"
export PUBSUB_TOPIC="projects/$PROJECT_NAME/topics/etl-topic"

# setup to run local version of sample-metadata
export SM_ENVIRONMENT=local
Expand Down Expand Up @@ -111,7 +111,9 @@ gcloud functions deploy etl_load \
--source=. \
--entry-point=etl_load \
--trigger-http \
--set-env-vars BIGQUERY_TABLE='$PROJECT_NAME.metamist.etl-data'
--no-allow-unauthenticated \
--set-env-vars BIGQUERY_TABLE=$BIGQUERY_TABLE \
--set-env-vars PUBSUB_TOPIC=$PUBSUB_TOPIC
```

```bash
Expand All @@ -125,6 +127,7 @@ gcloud functions deploy etl_extract \
--source=. \
--entry-point=etl_post \
--trigger-http \
--set-env-vars BIGQUERY_TABLE='$PROJECT_NAME.metamist.etl-data' \
--set-env-vars PUBSUB_TOPIC='projects/$PROJECT_NAME/topics/my-topic'
--no-allow-unauthenticated \
--set-env-vars BIGQUERY_TABLE=$BIGQUERY_TABLE \
--set-env-vars PUBSUB_TOPIC=$PUBSUB_TOPIC
```
205 changes: 116 additions & 89 deletions etl/load/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import asyncio
import base64
import datetime
from enum import Enum
import json
import logging
import os
from typing import Any, Dict, List
from typing import Any

import flask
import functions_framework
Expand All @@ -21,12 +22,21 @@
DEFAULT_LOAD_CONFIG = os.getenv('DEFAULT_LOAD_CONFIG', '{}')


def call_parser(parser_obj, row_json) -> tuple[str, str]:
class ParsingStatus(Enum):
"""
Enum type to distinguish between sucess and failure of parsing
"""

SUCCESS = 1
FAILED = 2


def call_parser(parser_obj, row_json) -> tuple[ParsingStatus, str]:
"""
This function calls parser_obj.from_json and returns status and result
"""
tmp_res: List[str] = []
tmp_status: List[str] = []
tmp_res: list[str] = []
tmp_status: list[ParsingStatus] = []

# GenericMetadataParser from_json is async
# we call it from sync, so we need to wrap it in coroutine
Expand All @@ -35,86 +45,93 @@ async def run_parser_capture_result(parser_obj, row_data, res, status):
# TODO better error handling
r = await parser_obj.from_json([row_data], confirm=False, dry_run=True)
res.append(r)
status.append('SUCCESS')
status.append(ParsingStatus.SUCCESS)
except Exception as e: # pylint: disable=broad-exception-caught
logging.error(f'Failed to parse: {e}')
# add to the output
res.append(f'Failed to parse: {e}')
status.append('FAILED')
status.append(ParsingStatus.FAILED)

asyncio.run(run_parser_capture_result(parser_obj, row_json, tmp_res, tmp_status))
return tmp_status[0], tmp_res[0]


def process_rows(query_job_result, delivery_attempt, request_id, parser_map, bq_client):
def process_rows(
bq_row: bq.table.Row,
delivery_attempt: int,
request_id: str,
parser_map: dict,
bq_client: bq.Client,
) -> tuple[ParsingStatus, Any, Any]:
"""
Process BQ results rows, should be only one row
"""
status = None
parsing_result = None
row_json = None

for row in query_job_result:
sample_type = row.type
# sample_type should be in the format /ParserName/Version e.g.: /bbv/v1

row_json = json.loads(row.body)
source_type = bq_row.type
# source_type should be in the format /ParserName/Version e.g.: /bbv/v1

row_json = json.loads(bq_row.body)

# get config from payload and merge with the default
config_data = json.loads(DEFAULT_LOAD_CONFIG)
payload_config_data = row_json.get('config')
if payload_config_data:
config_data.update(payload_config_data)

# get data from payload or use payload as data
record_data = row_json.get('data', row_json)

(parser_obj, err_msg) = get_parser_instance(parser_map, source_type, config_data)

if parser_obj:
# Parse bq_row.body -> Model and upload to metamist database
status, parsing_result = call_parser(parser_obj, record_data)
else:
status = ParsingStatus.FAILED
parsing_result = f'Error: {err_msg} when parsing record with id: {request_id}'

if delivery_attempt == 1:
# log only at the first attempt,
# pub/sub min try is 5x and we do not want to spam slack with errors
log_details = {
'source_type': source_type,
'submitting_user': bq_row.submitting_user,
'result': str(parsing_result),
}

# get config from payload and merge with the default
config_data = json.loads(DEFAULT_LOAD_CONFIG)
payload_config_data = row_json.get('config')
if payload_config_data:
config_data.update(payload_config_data)
# log results to BIGQUERY_LOG_TABLE
log_record = {
'request_id': request_id,
'timestamp': datetime.datetime.utcnow().isoformat(),
'status': status.name,
'details': json.dumps(log_details),
}

# get data from payload or use payload as data
record_data = row_json.get('data', row_json)
if BIGQUERY_LOG_TABLE is None:
logging.error('BIGQUERY_LOG_TABLE is not set')
return status, parsing_result, row_json

(parser_obj, err_msg) = get_parser_instance(
parser_map, sample_type, config_data
errors = bq_client.insert_rows_json(
BIGQUERY_LOG_TABLE,
[log_record],
)
if parser_obj:
# Parse row.body -> Model and upload to metamist database
status, parsing_result = call_parser(parser_obj, record_data)
else:
status = 'FAILED'
parsing_result = (
f'Error: {err_msg} when parsing record with id: {request_id}'
)

if delivery_attempt == 1:
# log only at the first attempt
log_details = {
'type': sample_type,
'submitting_user': row.submitting_user,
'result': f'{parsing_result}',
}

# log results to BIGQUERY_LOG_TABLE
log_record = {
'request_id': request_id,
'timestamp': datetime.datetime.utcnow().isoformat(),
'status': status,
'details': json.dumps(log_details),
}

errors = bq_client.insert_rows_json(
BIGQUERY_LOG_TABLE,
[log_record],
)
if errors:
logging.error(f'Failed to log to BQ: {errors}')

if status == 'FAILED':
# publish to notification pubsub
msg_title = 'Metamist ETL Load Failed'
try:
pubsub_client = pubsub_v1.PublisherClient()
pubsub_client.publish(
NOTIFICATION_PUBSUB_TOPIC,
json.dumps({'title': msg_title} | log_record).encode(),
)
except Exception as e: # pylint: disable=broad-exception-caught
logging.error(f'Failed to publish to pubsub: {e}')
if errors:
logging.error(f'Failed to log to BQ: {errors}')

if NOTIFICATION_PUBSUB_TOPIC is None:
logging.error('NOTIFICATION_PUBSUB_TOPIC is not set')
return status, parsing_result, row_json

if status == ParsingStatus.FAILED:
# publish to notification pubsub
msg_title = 'Metamist ETL Load Failed'
try:
pubsub_client = pubsub_v1.PublisherClient()
pubsub_client.publish(
NOTIFICATION_PUBSUB_TOPIC,
json.dumps({'title': msg_title} | log_record).encode(),
)
except Exception as e: # pylint: disable=broad-exception-caught
logging.error(f'Failed to publish to pubsub: {e}')

return status, parsing_result, row_json

Expand Down Expand Up @@ -194,39 +211,49 @@ def etl_load(request: flask.Request):
query_job_result = bq_client.query(query, job_config=job_config).result()

if query_job_result.total_rows == 0:
# Request ID not found
return {
'success': False,
'message': f'Record with id: {request_id} not found',
}, 412 # Precondition Failed

# should be only one record, look into loading multiple objects in one call?
(status, parsing_result, last_record) = process_rows(
query_job_result, delivery_attempt, request_id, parser_map, bq_client
if query_job_result.total_rows > 1:
# This should never happen, Request ID should be unique
return {
'success': False,
'message': f'Multiple Records with the same id: {request_id}',
}, 412 # Precondition Failed

# will be exactly one row (request), row can contain multiple records (samples)
bq_job_result = list(query_job_result)[0]

(status, parsing_result, uploaded_record) = process_rows(
bq_job_result, delivery_attempt, request_id, parser_map, bq_client
)

# return success
if status and status == 'SUCCESS':
if status == ParsingStatus.SUCCESS:
return {
'id': request_id,
'record': last_record,
'result': f'{parsing_result}',
'record': uploaded_record,
'result': str(parsing_result),
'success': True,
}
}, 200

# otherwise return error
return {
'id': request_id,
'record': last_record,
'result': f'{parsing_result}',
'record': uploaded_record,
'result': str(parsing_result),
'success': False,
}, 500
}, 400


def extract_request_id(jbody: Dict[str, Any]) -> tuple[str | None, str | None]:
def extract_request_id(jbody: dict[str, Any]) -> tuple[int | None, str | None]:
"""Unwrapp request id from the payload
Args:
jbody (Dict[str, Any]): Json body of payload
jbody (dict[str, Any]): Json body of payload
Returns:
str | None: ID of object to be loaded
Expand All @@ -236,7 +263,7 @@ def extract_request_id(jbody: Dict[str, Any]) -> tuple[str | None, str | None]:

request_id = jbody.get('request_id')
# set default delivery_attempt as 1
delivery_attempt = jbody.get('deliveryAttempt', 1)
delivery_attempt = int(jbody.get('deliveryAttempt', 1))
if request_id:
return delivery_attempt, request_id

Expand All @@ -262,23 +289,23 @@ def extract_request_id(jbody: Dict[str, Any]) -> tuple[str | None, str | None]:


def get_parser_instance(
parser_map: dict, sample_type: str | None, init_params: dict | None
parser_map: dict, source_type: str | None, init_params: dict | None
) -> tuple[object | None, str | None]:
"""Extract parser name from sample_type
"""Extract parser name from source_type
Args:
sample_type (str | None): _description_
source_type (str | None): _description_
Returns:
object | None: _description_
"""
if not sample_type:
return None, 'Empty sample_type'
if not source_type:
return None, 'Empty source_type'

parser_class_ = parser_map.get(sample_type, None)
parser_class_ = parser_map.get(source_type, None)
if not parser_class_:
# class not found
return None, f'Parser for {sample_type} not found'
return None, f'Parser for {source_type} not found'

# TODO: in case of generic metadata parser, we need to create instance
try:
Expand All @@ -293,7 +320,7 @@ def get_parser_instance(
return parser_obj, None


def prepare_parser_map():
def prepare_parser_map() -> dict:
"""Prepare parser map
loop through metamist_parser entry points and create map of parsers
"""
Expand Down
Loading

0 comments on commit d884b13

Please sign in to comment.