Skip to content

Commit

Permalink
Added etl log table, implemented matching parser to url path.
Browse files Browse the repository at this point in the history
  • Loading branch information
milo-hyben committed Sep 7, 2023
1 parent 78109f6 commit 35100fb
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 132 deletions.
1 change: 1 addition & 0 deletions etl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ gcloud auth application-default login

export PROJECT_NAME='gcp-project-name'
export BIGQUERY_TABLE='$PROJECT_NAME.metamist.etl-data'
export BIGQUERY_LOG_TABLE='$PROJECT_NAME.metamist.etl-logs'
export PUBSUB_TOPIC='projects/$PROJECT_NAME/topics/etl-topic'

# setup to run local version of sample-metadata
Expand Down
26 changes: 26 additions & 0 deletions etl/bq_log_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[
{
"name": "request_id",
"type": "STRING",
"mode": "REQUIRED",
"description": "Unique UUID for the row"
},
{
"name": "timestamp",
"type": "TIMESTAMP",
"mode": "REQUIRED",
"description": "Timestamp of processing the row"
},
{
"name": "status",
"type": "STRING",
"mode": "REQUIRED",
"description": "Status of the processing FAILED/SUCCESS"
},
{
"name": "details",
"type": "JSON",
"mode": "REQUIRED",
"description": "Output the processing"
}
]
221 changes: 158 additions & 63 deletions etl/load/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import base64
import datetime
import json
import logging
import os
Expand All @@ -8,24 +9,60 @@
import flask
import functions_framework
import google.cloud.bigquery as bq
import metamist.parser.generic_metadata_parser as gmp
import metamist.parser as mp

BIGQUERY_TABLE = os.getenv('BIGQUERY_TABLE')

# TODO: The following constants are going to change
PARTICIPANT_COL_NAME = 'individual_id'
SAMPLE_ID_COL_NAME = 'sample_id'
SEQ_TYPE_COL_NAME = 'sequencing_type'
SAMPLE_META_MAP = {
'collection_centre': 'centre',
'collection_date': 'collection_date',
'collection_specimen': 'specimen',
BIGQUERY_LOG_TABLE = os.getenv('BIGQUERY_LOG_TABLE')

DEFAULT_PARSER_PARAMS = {
'search_locations': [],
'project': 'milo-dev',
'participant_column': 'individual_id',
'sample_name_column': 'sample_id',
'seq_type_column': 'sequencing_type',
'default_sequencing_type': 'sequencing_type',
'default_sample_type': 'blood',
'default_sequencing_technology': 'short-read',
'sample_meta_map': {
'collection_centre': 'centre',
'collection_date': 'collection_date',
'collection_specimen': 'specimen',
},
'participant_meta_map': {},
'assay_meta_map': {},
'qc_meta_map': {},
}

DEFAULT_SEQUENCING_TYPE = 'genome'
DEFAULT_SEQUENCING_TECHNOLOGY = 'short-read'
METAMIST_PROJECT = 'milo-dev'
DEFAULT_SAMPLE_TYPE = 'blood'

def call_parser(parser_obj, row_json):
"""_summary_
Args:
parser_obj (_type_): _description_
row_json (_type_): _description_
Returns:
_type_: _description_
"""
tmp_res = []
tmp_status = []

# GenericMetadataParser from_json is async
# we call it from sync, so we need to wrap it in coroutine
async def run_parser_capture_result(parser_obj, row_data, res, status):
try:
# TODO better error handling
r = await parser_obj.from_json([row_data], confirm=False, dry_run=True)
res.append(r)
status.append('SUCCESS')
except Exception as e: # pylint: disable=broad-exception-caught
logging.error(f'Failed to parse the row {e}')
# add to the output
res.append(e)
status.append('FAILED')

asyncio.run(run_parser_capture_result(parser_obj, row_json, tmp_res, tmp_status))
return tmp_status[0], tmp_res[0]


@functions_framework.http
Expand Down Expand Up @@ -79,6 +116,14 @@ def etl_load(request: flask.Request):
'message': f'Missing or empty request_id: {jbody_str}',
}, 400

# prepare parser map
# parser_map = {
# 'gmp/v1': <class 'metamist.parser.generic_metadata_parser.GenericMetadataParser'>,
# 'sfmp/v1': <class 'metamist.parser.sample_file_map_parser.SampleFileMapParser'>,
# 'bbv/v1': bbv.BbvV1Parser, TODO: add bbv parser
# }
parser_map = prepare_parser_map(mp.GenericParser, default_version='v1')

# locate the request_id in bq
query = f"""
SELECT * FROM `{BIGQUERY_TABLE}` WHERE request_id = @request_id
Expand All @@ -101,63 +146,55 @@ def etl_load(request: flask.Request):
# should be only one record, look into loading multiple objects in one call?
row_json = None
result = None
status = None
for row in query_job_result:
# example of json record:
# row_json = {
# 'sample_id': '123456',
# 'external_id': 'GRK100311',
# 'individual_id': '608',
# 'sequencing_type': 'exome',
# 'collection_centre': 'KCCG',
# 'collection_date': '2023-08-05T01:39:28.611476',
# 'collection_specimen': 'blood'
# }
# TODO
# Parse row.body -> Model and upload to metamist database
sample_type = row.type
# sample_type should be in the format /ParserName/Version e.g.: /bbv/v1

row_json = json.loads(row.body)
# get config from payload or use default
config_data = row_json.get('config', DEFAULT_PARSER_PARAMS)
# get data from payload or use payload as data
record_data = row_json.get('data', row_json)

parser_obj = get_parser_instance(parser_map, sample_type, config_data)
if not parser_obj:
return {
'success': False,
'message': f'Missing or invalid sample_type: {sample_type} in the record with id: {request_id}',
}, 412 # Precondition Failed

tmp_res = []

# GenericMetadataParser from_json is async
# we call it from sync, so we need to wrap it in coroutine
async def run_parser_capture_result(res, row_data):
try:
# TODO better error handling
parser = gmp.GenericMetadataParser(
search_locations=[],
project=METAMIST_PROJECT,
participant_column=PARTICIPANT_COL_NAME,
sample_name_column=SAMPLE_ID_COL_NAME,
reads_column=None,
checksum_column=None,
seq_type_column=SEQ_TYPE_COL_NAME,
default_sequencing_type=DEFAULT_SEQUENCING_TYPE,
default_sample_type=DEFAULT_SAMPLE_TYPE,
default_sequencing_technology=DEFAULT_SEQUENCING_TECHNOLOGY,
default_reference_assembly_location=None,
participant_meta_map={},
sample_meta_map=SAMPLE_META_MAP,
assay_meta_map={},
qc_meta_map={},
allow_extra_files_in_search_path=None,
key_map=None,
)
r = await parser.from_json([row_data], confirm=False, dry_run=True)
res.append(r)
except Exception as e: # pylint: disable=broad-exception-caught
logging.error(f'Failed to parse the row {e}')
# add to the output
res.append(e)

asyncio.run(run_parser_capture_result(tmp_res, row_json))
result = tmp_res[0]
# Parse row.body -> Model and upload to metamist database
status, result = call_parser(parser_obj, record_data)

# log results to BIGQUERY_LOG_TABLE
bq_client.insert_rows_json(
BIGQUERY_LOG_TABLE,
[
{
'request_id': request_id,
'timestamp': datetime.datetime.utcnow().isoformat(),
'status': status,
'details': json.dumps({'result': f"'{result}'"}),
}
],
)

if status and status == 'SUCCESS':
return {
'id': request_id,
'record': row_json,
'result': f"'{result}'",
'success': True,
}

# some error happened
return {
'id': request_id,
'record': row_json,
'result': f"'{result}'",
'success': True,
}
'success': False,
}, 500


def extract_request_id(jbody: Dict[str, Any]) -> str | None:
Expand Down Expand Up @@ -195,3 +232,61 @@ def extract_request_id(jbody: Dict[str, Any]) -> str | None:
logging.error(f'Failed to extract request_id from the payload {e}')

return request_id


def get_parser_instance(
parser_map: dict, sample_type: str | None, init_params: dict | None
) -> object | None:
"""Extract parser name from sample_type
Args:
sample_type (str | None): _description_
Returns:
object | None: _description_
"""
if not sample_type:
return None

parser_class_ = parser_map.get(sample_type, None)
if not parser_class_:
# class not found
return None

# TODO: in case of generic metadata parser, we need to create instance
try:
if init_params:
parser_obj = parser_class_(**init_params)
else:
parser_obj = parser_class_()
except Exception as e: # pylint: disable=broad-exception-caught
logging.error(f'Failed to create parser instance {e}')
return None

return parser_obj


def all_subclasses(cls: object) -> set:
"""Recursively find all subclasses of cls"""
return set(cls.__subclasses__()).union(
[s for c in cls.__subclasses__() for s in all_subclasses(c)]
)


def prepare_parser_map(cls, default_version='v1'):
"""Prepare parser map for the given class
Args:
cls: class to find subclasses of
version: version of the parser
Returns:
parser map
"""
parser_map = {}
for parser_cls in all_subclasses(cls):
parser_code = ''.join(
[ch for ch in str(parser_cls).rsplit('.', maxsplit=1)[-1] if ch.isupper()]
)
parser_map[f'/{parser_code.lower()}/{default_version}'] = parser_cls

return parser_map
Binary file modified etl/load/metamist-6.2.0.tar.gz
Binary file not shown.
Loading

0 comments on commit 35100fb

Please sign in to comment.