diff --git a/etl/README.md b/etl/README.md index 9328bca30..e5a5cc10d 100644 --- a/etl/README.md +++ b/etl/README.md @@ -24,6 +24,7 @@ gcloud auth application-default login export PROJECT_NAME='gcp-project-name' export BIGQUERY_TABLE='$PROJECT_NAME.metamist.etl-data' +export BIGQUERY_LOG_TABLE='$PROJECT_NAME.metamist.etl-logs' export PUBSUB_TOPIC='projects/$PROJECT_NAME/topics/etl-topic' # setup to run local version of sample-metadata diff --git a/etl/bq_log_schema.json b/etl/bq_log_schema.json new file mode 100644 index 000000000..92d9e37c9 --- /dev/null +++ b/etl/bq_log_schema.json @@ -0,0 +1,26 @@ +[ + { + "name": "request_id", + "type": "STRING", + "mode": "REQUIRED", + "description": "Unique UUID for the row" + }, + { + "name": "timestamp", + "type": "TIMESTAMP", + "mode": "REQUIRED", + "description": "Timestamp of processing the row" + }, + { + "name": "status", + "type": "STRING", + "mode": "REQUIRED", + "description": "Status of the processing FAILED/SUCCESS" + }, + { + "name": "details", + "type": "JSON", + "mode": "REQUIRED", + "description": "Output the processing" + } +] diff --git a/etl/load/main.py b/etl/load/main.py index d7d310a2f..3bdfaf8b1 100644 --- a/etl/load/main.py +++ b/etl/load/main.py @@ -1,5 +1,6 @@ import asyncio import base64 +import datetime import json import logging import os @@ -8,24 +9,60 @@ import flask import functions_framework import google.cloud.bigquery as bq -import metamist.parser.generic_metadata_parser as gmp +import metamist.parser as mp BIGQUERY_TABLE = os.getenv('BIGQUERY_TABLE') - -# TODO: The following constants are going to change -PARTICIPANT_COL_NAME = 'individual_id' -SAMPLE_ID_COL_NAME = 'sample_id' -SEQ_TYPE_COL_NAME = 'sequencing_type' -SAMPLE_META_MAP = { - 'collection_centre': 'centre', - 'collection_date': 'collection_date', - 'collection_specimen': 'specimen', +BIGQUERY_LOG_TABLE = os.getenv('BIGQUERY_LOG_TABLE') + +DEFAULT_PARSER_PARAMS = { + 'search_locations': [], + 'project': 'milo-dev', + 'participant_column': 'individual_id', + 'sample_name_column': 'sample_id', + 'seq_type_column': 'sequencing_type', + 'default_sequencing_type': 'sequencing_type', + 'default_sample_type': 'blood', + 'default_sequencing_technology': 'short-read', + 'sample_meta_map': { + 'collection_centre': 'centre', + 'collection_date': 'collection_date', + 'collection_specimen': 'specimen', + }, + 'participant_meta_map': {}, + 'assay_meta_map': {}, + 'qc_meta_map': {}, } -DEFAULT_SEQUENCING_TYPE = 'genome' -DEFAULT_SEQUENCING_TECHNOLOGY = 'short-read' -METAMIST_PROJECT = 'milo-dev' -DEFAULT_SAMPLE_TYPE = 'blood' + +def call_parser(parser_obj, row_json): + """_summary_ + + Args: + parser_obj (_type_): _description_ + row_json (_type_): _description_ + + Returns: + _type_: _description_ + """ + tmp_res = [] + tmp_status = [] + + # GenericMetadataParser from_json is async + # we call it from sync, so we need to wrap it in coroutine + async def run_parser_capture_result(parser_obj, row_data, res, status): + try: + # TODO better error handling + r = await parser_obj.from_json([row_data], confirm=False, dry_run=True) + res.append(r) + status.append('SUCCESS') + except Exception as e: # pylint: disable=broad-exception-caught + logging.error(f'Failed to parse the row {e}') + # add to the output + res.append(e) + status.append('FAILED') + + asyncio.run(run_parser_capture_result(parser_obj, row_json, tmp_res, tmp_status)) + return tmp_status[0], tmp_res[0] @functions_framework.http @@ -79,6 +116,14 @@ def etl_load(request: flask.Request): 'message': f'Missing or empty request_id: {jbody_str}', }, 400 + # prepare parser map + # parser_map = { + # 'gmp/v1': , + # 'sfmp/v1': , + # 'bbv/v1': bbv.BbvV1Parser, TODO: add bbv parser + # } + parser_map = prepare_parser_map(mp.GenericParser, default_version='v1') + # locate the request_id in bq query = f""" SELECT * FROM `{BIGQUERY_TABLE}` WHERE request_id = @request_id @@ -101,63 +146,55 @@ def etl_load(request: flask.Request): # should be only one record, look into loading multiple objects in one call? row_json = None result = None + status = None for row in query_job_result: - # example of json record: - # row_json = { - # 'sample_id': '123456', - # 'external_id': 'GRK100311', - # 'individual_id': '608', - # 'sequencing_type': 'exome', - # 'collection_centre': 'KCCG', - # 'collection_date': '2023-08-05T01:39:28.611476', - # 'collection_specimen': 'blood' - # } - # TODO - # Parse row.body -> Model and upload to metamist database + sample_type = row.type + # sample_type should be in the format /ParserName/Version e.g.: /bbv/v1 + row_json = json.loads(row.body) + # get config from payload or use default + config_data = row_json.get('config', DEFAULT_PARSER_PARAMS) + # get data from payload or use payload as data + record_data = row_json.get('data', row_json) + + parser_obj = get_parser_instance(parser_map, sample_type, config_data) + if not parser_obj: + return { + 'success': False, + 'message': f'Missing or invalid sample_type: {sample_type} in the record with id: {request_id}', + }, 412 # Precondition Failed - tmp_res = [] - - # GenericMetadataParser from_json is async - # we call it from sync, so we need to wrap it in coroutine - async def run_parser_capture_result(res, row_data): - try: - # TODO better error handling - parser = gmp.GenericMetadataParser( - search_locations=[], - project=METAMIST_PROJECT, - participant_column=PARTICIPANT_COL_NAME, - sample_name_column=SAMPLE_ID_COL_NAME, - reads_column=None, - checksum_column=None, - seq_type_column=SEQ_TYPE_COL_NAME, - default_sequencing_type=DEFAULT_SEQUENCING_TYPE, - default_sample_type=DEFAULT_SAMPLE_TYPE, - default_sequencing_technology=DEFAULT_SEQUENCING_TECHNOLOGY, - default_reference_assembly_location=None, - participant_meta_map={}, - sample_meta_map=SAMPLE_META_MAP, - assay_meta_map={}, - qc_meta_map={}, - allow_extra_files_in_search_path=None, - key_map=None, - ) - r = await parser.from_json([row_data], confirm=False, dry_run=True) - res.append(r) - except Exception as e: # pylint: disable=broad-exception-caught - logging.error(f'Failed to parse the row {e}') - # add to the output - res.append(e) - - asyncio.run(run_parser_capture_result(tmp_res, row_json)) - result = tmp_res[0] + # Parse row.body -> Model and upload to metamist database + status, result = call_parser(parser_obj, record_data) + + # log results to BIGQUERY_LOG_TABLE + bq_client.insert_rows_json( + BIGQUERY_LOG_TABLE, + [ + { + 'request_id': request_id, + 'timestamp': datetime.datetime.utcnow().isoformat(), + 'status': status, + 'details': json.dumps({'result': f"'{result}'"}), + } + ], + ) + + if status and status == 'SUCCESS': + return { + 'id': request_id, + 'record': row_json, + 'result': f"'{result}'", + 'success': True, + } + # some error happened return { 'id': request_id, 'record': row_json, 'result': f"'{result}'", - 'success': True, - } + 'success': False, + }, 500 def extract_request_id(jbody: Dict[str, Any]) -> str | None: @@ -195,3 +232,61 @@ def extract_request_id(jbody: Dict[str, Any]) -> str | None: logging.error(f'Failed to extract request_id from the payload {e}') return request_id + + +def get_parser_instance( + parser_map: dict, sample_type: str | None, init_params: dict | None +) -> object | None: + """Extract parser name from sample_type + + Args: + sample_type (str | None): _description_ + + Returns: + object | None: _description_ + """ + if not sample_type: + return None + + parser_class_ = parser_map.get(sample_type, None) + if not parser_class_: + # class not found + return None + + # TODO: in case of generic metadata parser, we need to create instance + try: + if init_params: + parser_obj = parser_class_(**init_params) + else: + parser_obj = parser_class_() + except Exception as e: # pylint: disable=broad-exception-caught + logging.error(f'Failed to create parser instance {e}') + return None + + return parser_obj + + +def all_subclasses(cls: object) -> set: + """Recursively find all subclasses of cls""" + return set(cls.__subclasses__()).union( + [s for c in cls.__subclasses__() for s in all_subclasses(c)] + ) + + +def prepare_parser_map(cls, default_version='v1'): + """Prepare parser map for the given class + + Args: + cls: class to find subclasses of + version: version of the parser + Returns: + parser map + """ + parser_map = {} + for parser_cls in all_subclasses(cls): + parser_code = ''.join( + [ch for ch in str(parser_cls).rsplit('.', maxsplit=1)[-1] if ch.isupper()] + ) + parser_map[f'/{parser_code.lower()}/{default_version}'] = parser_cls + + return parser_map diff --git a/etl/load/metamist-6.2.0.tar.gz b/etl/load/metamist-6.2.0.tar.gz index ca01825f4..6c1ef9720 100644 Binary files a/etl/load/metamist-6.2.0.tar.gz and b/etl/load/metamist-6.2.0.tar.gz differ diff --git a/etl/test/test_etl_load.py b/etl/test/test_etl_load.py index f31ac9929..0b54617ca 100644 --- a/etl/test/test_etl_load.py +++ b/etl/test/test_etl_load.py @@ -1,18 +1,19 @@ import base64 import json from test.testbase import DbIsolatedTest, run_as_sync -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import MagicMock, patch import etl.load.main -from db.python.layers.family import FamilyLayer -from db.python.layers.participant import ParticipantLayer -from metamist.parser.generic_metadata_parser import GenericMetadataParser -from models.models import ParticipantUpsertInternal +import metamist.parser as mp -ETL_SAMPLE_RECORD = """ +ETL_SAMPLE_RECORD_1 = """ {"identifier": "AB0002", "name": "j smith", "age": 50, "measurement": "98.7", "observation": "B++", "receipt_date": "1/02/2023"} """ +ETL_SAMPLE_RECORD_2 = """ +{"sample_id": "123456", "external_id": "GRK100311", "individual_id": "608", "sequencing_type": "exome", "collection_centre": "KCCG", "collection_date": "2023-08-05T01:39:28.611476", "collection_specimen": "blood"} +""" + class TestEtlLoad(DbIsolatedTest): """Test etl cloud functions""" @@ -21,36 +22,6 @@ class TestEtlLoad(DbIsolatedTest): async def setUp(self) -> None: super().setUp() - fl = FamilyLayer(self.connection) - - self.fid_1 = await fl.create_family(external_id='FAM01') - self.fid_2 = await fl.create_family(external_id='FAM02') - - pl = ParticipantLayer(self.connection) - self.pid = ( - await pl.upsert_participant( - ParticipantUpsertInternal(external_id='EX01', reported_sex=2) - ) - ).id - self.pat_pid = ( - await pl.upsert_participant( - ParticipantUpsertInternal(external_id='EX01_pat', reported_sex=1) - ) - ).id - self.mat_pid = ( - await pl.upsert_participant( - ParticipantUpsertInternal(external_id='EX01_mat', reported_sex=2) - ) - ).id - - await pl.add_participant_to_family( - family_id=self.fid_1, - participant_id=self.pid, - paternal_id=self.pat_pid, - maternal_id=self.mat_pid, - affected=2, - ) - @run_as_sync @patch('etl.load.main.bq.Client', autospec=True) async def test_etl_load_not_found_record(self, bq_client): @@ -83,16 +54,17 @@ async def test_etl_load_not_found_record(self, bq_client): @run_as_sync @patch('etl.load.main.bq.Client', autospec=True) - @patch('etl.load.main.gmp.GenericMetadataParser', autospec=True) - async def test_etl_load_found_record_simple_payload(self, gm_parser, bq_client): + @patch('etl.load.main.call_parser') + async def test_etl_load_found_record_simple_payload(self, call_parser, bq_client): """Test etl load simple payload""" request = MagicMock( args={}, spec=['__len__', 'toJSON', 'authorization', 'get_json'] ) request.get_json.return_value = json.loads('{"request_id": "1234567890"}') - query_row = MagicMock(args={}, spec=['body']) - query_row.body = ETL_SAMPLE_RECORD + query_row = MagicMock(args={}, spec=['body', 'type']) + query_row.body = ETL_SAMPLE_RECORD_2 + query_row.type = '/gmp/v1' query_job_result = MagicMock(args={}, spec=['__iter__', '__next__']) query_job_result.total_rows = 1 @@ -104,17 +76,14 @@ async def test_etl_load_found_record_simple_payload(self, gm_parser, bq_client): bq_client_instance = bq_client.return_value bq_client_instance.query.return_value = query_result - # TODO mockup GenericMetadataParser from_json with the right output - gm_parser_instance = gm_parser.return_value - # mock from_json return value, keep empty atm - gm_parser_instance.from_json = AsyncMock(return_value='') + call_parser.return_value = ('SUCCESS', '') response = etl.load.main.etl_load(request) self.assertDictEqual( response, { 'id': '1234567890', - 'record': json.loads(ETL_SAMPLE_RECORD), + 'record': json.loads(ETL_SAMPLE_RECORD_2), 'result': "''", 'success': True, }, @@ -122,8 +91,8 @@ async def test_etl_load_found_record_simple_payload(self, gm_parser, bq_client): @run_as_sync @patch('etl.load.main.bq.Client', autospec=True) - @patch('etl.load.main.gmp.GenericMetadataParser', autospec=True) - async def test_etl_load_found_record_pubsub_payload(self, gm_parser, bq_client): + @patch('etl.load.main.call_parser') + async def test_etl_load_found_record_pubsub_payload(self, call_parser, bq_client): """Test etl load pubsub payload""" request = MagicMock( args={}, spec=['__len__', 'toJSON', 'authorization', 'get_json'] @@ -150,8 +119,9 @@ async def test_etl_load_found_record_pubsub_payload(self, gm_parser, bq_client): request.get_json.return_value = pubsub_payload_example - query_row = MagicMock(args={}, spec=['body']) - query_row.body = ETL_SAMPLE_RECORD + query_row = MagicMock(args={}, spec=['body', 'type']) + query_row.body = ETL_SAMPLE_RECORD_2 + query_row.type = '/gmp/v1' query_job_result = MagicMock(args={}, spec=['__iter__', '__next__']) query_job_result.total_rows = 1 @@ -163,17 +133,14 @@ async def test_etl_load_found_record_pubsub_payload(self, gm_parser, bq_client): bq_client_instance = bq_client.return_value bq_client_instance.query.return_value = query_result - # TODO mockup GenericMetadataParser from_json with the right output - gm_parser_instance = gm_parser.return_value - # mock from_json return value, keep empty atm - gm_parser_instance.from_json = AsyncMock(return_value='') + call_parser.return_value = ('SUCCESS', '') response = etl.load.main.etl_load(request) self.assertDictEqual( response, { 'id': '6dc4b9ae-74ee-42ee-9298-b0a51d5c6836', - 'record': json.loads(ETL_SAMPLE_RECORD), + 'record': json.loads(ETL_SAMPLE_RECORD_2), 'result': "''", 'success': True, }, @@ -201,7 +168,7 @@ async def test_etl_load_parser( default_sequencing_technology = 'short-read' # parser = - GenericMetadataParser( + mp.GenericMetadataParser( search_locations=[], project=self.project_name, participant_column=PARTICIPANT_COL_NAME, @@ -235,4 +202,4 @@ async def test_etl_load_parser( # res = await parser.from_json(json_data, confirm=False, dry_run=True) # print(res) - assert True + # assert False diff --git a/metamist/parser/__init__.py b/metamist/parser/__init__.py index e69de29bb..0a96943b6 100644 --- a/metamist/parser/__init__.py +++ b/metamist/parser/__init__.py @@ -0,0 +1,5 @@ +# Parser package for the metamist project +from .cloudhelper import CloudHelper +from .generic_metadata_parser import GenericMetadataParser +from .generic_parser import GenericParser +from .sample_file_map_parser import SampleFileMapParser diff --git a/metamist_infrastructure/driver.py b/metamist_infrastructure/driver.py index 18df0c134..e1350cbb1 100644 --- a/metamist_infrastructure/driver.py +++ b/metamist_infrastructure/driver.py @@ -19,6 +19,7 @@ # ETL_FOLDER = Path(__file__).parent / 'etl' ETL_FOLDER = Path(__file__).parent.parent / 'etl' PATH_TO_ETL_BQ_SCHEMA = ETL_FOLDER / 'bq_schema.json' +PATH_TO_ETL_BQ_LOG_SCHEMA = ETL_FOLDER / 'bq_log_schema.json' # TODO: update implementation in cpg_infra project to enable binary files @@ -304,17 +305,14 @@ def etl_bigquery_dataset(self): ), ) - @cached_property - def etl_bigquery_table(self): - """ - Bigquery table to contain the etl data - """ - with open(PATH_TO_ETL_BQ_SCHEMA) as f: + def _setup_bq_table(self, schema_file_name: str, table_name: str): + """Setup Bigquery table""" + with open(schema_file_name) as f: schema = f.read() etl_table = gcp.bigquery.Table( - 'metamist-etl-bigquery-table', - table_id='etl-data', + f'metamist-etl-bigquery-table-{table_name}', + table_id=f'etl-{table_name}', dataset_id=self.etl_bigquery_dataset.dataset_id, labels={'project': 'metamist'}, schema=schema, @@ -326,9 +324,22 @@ def etl_bigquery_table(self): depends_on=[self.etl_bigquery_dataset], ), ) - return etl_table + @cached_property + def etl_bigquery_table(self): + """ + Bigquery table to contain the etl data + """ + return self._setup_bq_table(PATH_TO_ETL_BQ_SCHEMA, 'data') + + @cached_property + def etl_bigquery_log_table(self): + """ + Bigquery table to contain the etl logs + """ + return self._setup_bq_table(PATH_TO_ETL_BQ_LOG_SCHEMA, 'logs') + @cached_property def slack_channel(self): """ @@ -386,19 +397,19 @@ def setup_etl(self): """ setup_etl """ - # give the etl_load/extracr_service_accounts ability to read/write to bq table + # give the etl_load/extract service_accounts ability to read/write to bq table gcp.bigquery.DatasetAccess( - 'metamist-etl-bq-dataset-write-access', + 'metamist-etl-bq-dataset-extract-service-access', project=self.config.sample_metadata.gcp.project, dataset_id=self.etl_bigquery_dataset.dataset_id, role='WRITER', user_by_email=self.etl_extract_service_account.email, ) gcp.bigquery.DatasetAccess( - 'metamist-etl-bq-dataset-read-access', + 'metamist-etl-bq-dataset-load-service-access', project=self.config.sample_metadata.gcp.project, dataset_id=self.etl_bigquery_dataset.dataset_id, - role='READER', + role='WRITER', user_by_email=self.etl_load_service_account.email, ) # give the etl_load_service_account ability to execute bigquery jobs @@ -512,6 +523,13 @@ def _etl_function(self, f_name: str, sa_email: str): '.', self.etl_bigquery_table.table_id, ), + 'BIGQUERY_LOG_TABLE': pulumi.Output.concat( + self.etl_bigquery_log_table.project, + '.', + self.etl_bigquery_log_table.dataset_id, + '.', + self.etl_bigquery_log_table.table_id, + ), 'PUBSUB_TOPIC': self.etl_pubsub_topic.id, 'SM_ENVIRONMENT': 'DEVELOPMENT', # TODO: make it configurable },