Skip to content

Commit

Permalink
Updated unit tests for etl functions, added mock for GenericMetadataP…
Browse files Browse the repository at this point in the history
…arser.
  • Loading branch information
milo-hyben committed Sep 4, 2023
1 parent 27e3cef commit f149a72
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 87 deletions.
75 changes: 42 additions & 33 deletions etl/load/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import flask
import functions_framework
import google.cloud.bigquery as bq
from metamist.parser.generic_metadata_parser import GenericMetadataParser
import metamist.parser.generic_metadata_parser as gmp

BIGQUERY_TABLE = os.getenv('BIGQUERY_TABLE')

Expand Down Expand Up @@ -105,42 +105,51 @@ def etl_load(request: flask.Request):
row_json = None
result = None
for row in query_job_result:
"""example of json record:
row_json = {
'sample_id': '123456',
'external_id': 'GRK100311',
'individual_id': '608',
'sequencing_type': 'exome',
'collection_centre': 'KCCG',
'collection_date': '2023-08-05T01:39:28.611476',
'collection_specimen': 'blood'
}
"""
# example of json record:
# row_json = {
# 'sample_id': '123456',
# 'external_id': 'GRK100311',
# 'individual_id': '608',
# 'sequencing_type': 'exome',
# 'collection_centre': 'KCCG',
# 'collection_date': '2023-08-05T01:39:28.611476',
# 'collection_specimen': 'blood'
# }
# TODO
# Parse row.body -> Model and upload to metamist database
row_json = json.loads(row.body)

parser = GenericMetadataParser(
search_locations=[],
project=METAMIST_PROJECT,
participant_column=PARTICIPANT_COL_NAME,
sample_name_column=SAMPLE_ID_COL_NAME,
reads_column=None,
checksum_column=None,
seq_type_column=SEQ_TYPE_COL_NAME,
default_sequencing_type=DEFAULT_SEQUENCING_TYPE,
default_sample_type=DEFAULT_SAMPLE_TYPE,
default_sequencing_technology=DEFAULT_SEQUENCING_TECHNOLOGY,
default_reference_assembly_location=None,
participant_meta_map={},
sample_meta_map=SAMPLE_META_MAP,
assay_meta_map={},
qc_meta_map={},
allow_extra_files_in_search_path=None,
key_map=None,
)
result = parser.from_json([row_json], confirm=False, dry_run=True)
asyncio.run(result)
tmp_res = []

# GenericMetadataParser from_json is async
# we call it from sync, so we need to wrap it in coroutine
async def run_parser_capture_result(res, row_data):
parser = gmp.GenericMetadataParser(
search_locations=[],
project=METAMIST_PROJECT,
participant_column=PARTICIPANT_COL_NAME,
sample_name_column=SAMPLE_ID_COL_NAME,
reads_column=None,
checksum_column=None,
seq_type_column=SEQ_TYPE_COL_NAME,
default_sequencing_type=DEFAULT_SEQUENCING_TYPE,
default_sample_type=DEFAULT_SAMPLE_TYPE,
default_sequencing_technology=DEFAULT_SEQUENCING_TECHNOLOGY,
default_reference_assembly_location=None,
participant_meta_map={},
sample_meta_map=SAMPLE_META_MAP,
assay_meta_map={},
qc_meta_map={},
allow_extra_files_in_search_path=None,
key_map=None,
)
r = await parser.from_json([row_data], confirm=False, dry_run=True)
res.append(r)

loop = asyncio.get_event_loop()
coroutine = run_parser_capture_result(tmp_res, row_json)
loop.run_until_complete(coroutine)
result = tmp_res[0]

return {
'id': request_id,
Expand Down
96 changes: 56 additions & 40 deletions etl/test/test_etl_load.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import base64
import json
from test.testbase import DbIsolatedTest, run_as_sync
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, AsyncMock, patch

import etl.load.main
from db.python.layers.family import FamilyLayer
Expand Down Expand Up @@ -75,19 +75,19 @@ async def test_etl_load_not_found_record(self, bq_client):
bq_client_instance = bq_client.return_value
bq_client_instance.query.return_value = query_result

# TODO mockup GenericMetadataParser before uncommenting
# response = etl.load.main.etl_load(request)
# assert response == (
# {
# 'success': False,
# 'message': 'Record with id: 1234567890 not found',
# },
# 404,
# )
response = etl.load.main.etl_load(request)
assert response == (
{
'success': False,
'message': 'Record with id: 1234567890 not found',
},
404,
)

@run_as_sync
@patch('etl.load.main.bq.Client', autospec=True)
async def test_etl_load_found_record_simple_payload(self, bq_client):
@patch('etl.load.main.gmp.GenericMetadataParser', autospec=True)
async def test_etl_load_found_record_simple_payload(self, gm_parser, bq_client):
"""Test etl load simple payload"""
request = MagicMock(
args={}, spec=['__len__', 'toJSON', 'authorization', 'get_json']
Expand All @@ -109,17 +109,23 @@ async def test_etl_load_found_record_simple_payload(self, bq_client):
bq_client_instance = bq_client.return_value
bq_client_instance.query.return_value = query_result

# TODO mockup GenericMetadataParser before uncommenting
# response = etl.load.main.etl_load(request)
# assert response == {
# 'id': '1234567890',
# 'record': json.loads(ETL_SAMPLE_RECORD),
# 'success': True,
# }
# TODO mockup GenericMetadataParser from_json with the right output
gm_parser_instance = gm_parser.return_value
# mock from_json return value, keep empty atm
gm_parser_instance.from_json = AsyncMock(return_value='')

response = etl.load.main.etl_load(request)
assert response == {
'id': '1234567890',
'record': json.loads(ETL_SAMPLE_RECORD),
'result': "''",
'success': True,
}

@run_as_sync
@patch('etl.load.main.bq.Client', autospec=True)
async def test_etl_load_found_record_pubsub_payload(self, bq_client):
@patch('etl.load.main.gmp.GenericMetadataParser', autospec=True)
async def test_etl_load_found_record_pubsub_payload(self, gm_parser, bq_client):
"""Test etl load pubsub payload"""
request = MagicMock(
args={}, spec=['__len__', 'toJSON', 'authorization', 'get_json']
Expand Down Expand Up @@ -159,18 +165,26 @@ async def test_etl_load_found_record_pubsub_payload(self, bq_client):
bq_client_instance = bq_client.return_value
bq_client_instance.query.return_value = query_result

# response = etl.load.main.etl_load(request)
# assert response == {
# 'id': '6dc4b9ae-74ee-42ee-9298-b0a51d5c6836',
# 'record': json.loads(ETL_SAMPLE_RECORD),
# 'success': True,
# }
# TODO mockup GenericMetadataParser from_json with the right output
gm_parser_instance = gm_parser.return_value
# mock from_json return value, keep empty atm
gm_parser_instance.from_json = AsyncMock(return_value='')

response = etl.load.main.etl_load(request)
assert response == {
'id': '6dc4b9ae-74ee-42ee-9298-b0a51d5c6836',
'record': json.loads(ETL_SAMPLE_RECORD),
'result': "''",
'success': True,
}

@run_as_sync
async def test_etl_load_parser(
self,
):
"""Test etl load parser"""
"""Test simple parsing of json data
Comment out if you want to test using LOCAL environment,
"""

PARTICIPANT_COL_NAME = 'individual_id'
SAMPLE_ID_COL_NAME = 'sample_id'
Expand All @@ -182,23 +196,13 @@ async def test_etl_load_parser(
'collection_specimen': 'specimen',
}

json_data = [
{
'sample_id': '123456',
'external_id': 'GRK100311',
'individual_id': '608',
'sequencing_type': 'exome',
'collection_centre': 'KCCG',
'collection_date': '2023-08-05T01:39:28.611476',
'collection_specimen': 'blood',
}
]
default_sequencing_type = 'genome'
default_sequencing_technology = 'short-read'

parser = GenericMetadataParser(
# parser =
GenericMetadataParser(
search_locations=[],
project=self.project_name, # 'greek-myth', #
project=self.project_name,
participant_column=PARTICIPANT_COL_NAME,
sample_name_column=SAMPLE_ID_COL_NAME,
reads_column=None,
Expand All @@ -216,6 +220,18 @@ async def test_etl_load_parser(
key_map=None,
)

# TODO mockup GenericMetadataParser before uncommenting
# json_data = [
# {
# 'sample_id': '123456',
# 'external_id': 'GRK100311',
# 'individual_id': '608',
# 'sequencing_type': 'exome',
# 'collection_centre': 'KCCG',
# 'collection_date': '2023-08-05T01:39:28.611476',
# 'collection_specimen': 'blood',
# }
# ]
# res = await parser.from_json(json_data, confirm=False, dry_run=True)
# print(res)

assert True
28 changes: 14 additions & 14 deletions metamist_infrastructure/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,13 +387,13 @@ def setup_etl(self):
),
)

self.setup_etl_functions()
self.setup_etl_pubsub()
self._setup_etl_functions()
self._setup_etl_pubsub()

self.setup_metamist_etl_accessors()
self.setup_slack_notification()
self._setup_metamist_etl_accessors()
self._setup_slack_notification()

def setup_etl_functions(self):
def _setup_etl_functions(self):
"""
setup_etl_functions
"""
Expand All @@ -402,7 +402,7 @@ def setup_etl_functions(self):
depends_on=[self.etl_extract_function, self.etl_load_function],
)

def setup_etl_pubsub(self):
def _setup_etl_pubsub(self):
"""
setup_etl_pubsub
"""
Expand All @@ -416,16 +416,16 @@ def setup_etl_pubsub(self):
@cached_property
def etl_extract_function(self):
"""etl_extract_function"""
return self.etl_function(
return self._etl_function(
'extract', self.etl_extract_service_account.email
)

@cached_property
def etl_load_function(self):
"""etl_load_function"""
return self.etl_function('load', self.etl_load_service_account.email)
return self._etl_function('load', self.etl_load_service_account.email)

def etl_function(self, f_name: str, sa_email: str):
def _etl_function(self, f_name: str, sa_email: str):
"""
Driver function to setup the etl cloud function
"""
Expand Down Expand Up @@ -493,7 +493,7 @@ def etl_function(self, f_name: str, sa_email: str):

return fxn

def setup_metamist_etl_accessors(self):
def _setup_metamist_etl_accessors(self):
for name, sa in self.etl_accessors.items():
gcp.cloudfunctionsv2.FunctionIamMember(
f'metamist-etl-accessor-{name}',
Expand All @@ -513,7 +513,7 @@ def setup_metamist_etl_accessors(self):
member=pulumi.Output.concat('serviceAccount:', sa.email),
)

def setup_function_slack_notification(self, etl_fun_name: str):
def _setup_function_slack_notification(self, etl_fun_name: str):
"""
setup slack notification for etl_fun cloud function
"""
Expand Down Expand Up @@ -556,9 +556,9 @@ def setup_function_slack_notification(self, etl_fun_name: str):
opts=pulumi.ResourceOptions(depends_on=[etl_fun]),
)

def setup_slack_notification(self):
def _setup_slack_notification(self):
if self.slack_channel is None:
return

self.setup_function_slack_notification('extract')
self.setup_function_slack_notification('load')
self._setup_function_slack_notification('extract')
self._setup_function_slack_notification('load')

0 comments on commit f149a72

Please sign in to comment.