From 6ffd5927087c76047970748256f81aa01c21528d Mon Sep 17 00:00:00 2001 From: Milo Hyben Date: Wed, 23 Aug 2023 15:46:09 +1000 Subject: [PATCH] Implement simple unit tests for etl/load and etl/extract. --- etl/extract/main.py | 12 ++- etl/load/main.py | 5 +- metamist/parser/sample_json_parser.py | 2 +- requirements-dev.txt | 4 +- test/test_etl.py | 130 ++++++++++++++++++++++++++ test/test_parse_sample_json.py | 14 +-- 6 files changed, 151 insertions(+), 16 deletions(-) create mode 100644 test/test_etl.py diff --git a/etl/extract/main.py b/etl/extract/main.py index 72c8ef788..6e7353817 100644 --- a/etl/extract/main.py +++ b/etl/extract/main.py @@ -14,9 +14,6 @@ BIGQUERY_TABLE = os.getenv('BIGQUERY_TABLE') PUBSUB_TOPIC = os.getenv('PUBSUB_TOPIC') -_BQ_CLIENT = bq.Client() -_PUBSUB_CLIENT = pubsub_v1.PublisherClient() - @functions_framework.http def etl_extract(request: flask.Request): @@ -41,6 +38,7 @@ def etl_extract(request: flask.Request): request_id = str(uuid.uuid4()) jbody = request.json + if callable(jbody): # request.json is it in reality, but the type checker is saying it's callable jbody = jbody() @@ -63,9 +61,13 @@ def etl_extract(request: flask.Request): } # throw an exception if one occurs - errors = _BQ_CLIENT.insert_rows_json( + bq_client = bq.Client() + pubsub_client = pubsub_v1.PublisherClient() + + errors = bq_client.insert_rows_json( BIGQUERY_TABLE, [bq_obj | {'body': jbody_str}] ) + if errors: return { 'success': False, @@ -77,7 +79,7 @@ def etl_extract(request: flask.Request): # message contains all the attributes except body which can be large # and already stored in BQ table try: - _PUBSUB_CLIENT.publish( + pubsub_client.publish( PUBSUB_TOPIC, json.dumps(bq_obj).encode() ) diff --git a/etl/load/main.py b/etl/load/main.py index e6ac4c1b9..f1010ff4e 100644 --- a/etl/load/main.py +++ b/etl/load/main.py @@ -11,8 +11,6 @@ BIGQUERY_TABLE = os.getenv('BIGQUERY_TABLE') -_BQ_CLIENT = bq.Client() - @functions_framework.http def etl_load(request: flask.Request): @@ -77,9 +75,10 @@ def etl_load(request: flask.Request): bq.ScalarQueryParameter('request_id', 'STRING', request_id), ] + bq_client = bq.Client() job_config = bq.QueryJobConfig() job_config.query_parameters = query_params - query_job_result = _BQ_CLIENT.query(query, job_config=job_config).result() + query_job_result = bq_client.query(query, job_config=job_config).result() if query_job_result.total_rows == 0: return { diff --git a/metamist/parser/sample_json_parser.py b/metamist/parser/sample_json_parser.py index cdc03b41f..9f44576db 100644 --- a/metamist/parser/sample_json_parser.py +++ b/metamist/parser/sample_json_parser.py @@ -67,4 +67,4 @@ async def parse( self, record: str, confirm=False, dry_run=False ): """Parse passed record """ - raise NotImplementedError('TO BE IMPLEMENTED') + raise NotImplementedError() diff --git a/requirements-dev.txt b/requirements-dev.txt index b107ca3c2..0cba71ad9 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -9,4 +9,6 @@ testcontainers[mariadb] nest-asyncio coverage types-PyMySQL -pytest +functions_framework +google-cloud-bigquery +google-cloud-pubsub diff --git a/test/test_etl.py b/test/test_etl.py new file mode 100644 index 000000000..4a61811ac --- /dev/null +++ b/test/test_etl.py @@ -0,0 +1,130 @@ +import json +import unittest +from unittest.mock import patch, MagicMock + +import etl.extract.main +import etl.load.main + +"""" execute only this test +python -m unittest discover -s test -k test_etl +""" + +ETL_SAMPLE_RECORD = """ +{"identifier": "AB0002", "name": "j smith", "age": 50, "measurement": "98.7", "observation": "B++", "receipt_date": "1/02/2023"} +""" + + +class TestEtl(unittest.TestCase): + """Test etl cloud functions""" + + @patch('etl.extract.main.email_from_id_token') + @patch('etl.extract.main.uuid.uuid4') + @patch('etl.extract.main.bq.Client', autospec=True) + @patch('etl.extract.main.pubsub_v1.PublisherClient', autospec=True) + def test_etl_extract_valid_payload( + self, pubsub_client, bq_client, uuid4_fun, email_from_id_token_fun + ): + request = MagicMock(args={}, spec=['__len__', 'toJSON', 'authorization']) + request.json = json.loads(ETL_SAMPLE_RECORD) + request.path = '' + + bq_client_instance = bq_client.return_value + bq_client_instance.insert_rows_json.return_value = None + + pubsub_client_instance = pubsub_client.return_value + pubsub_client_instance.publish.return_value = None + + email_from_id_token_fun.return_value = 'test@email.com' + uuid4_fun.return_value = '1234567890' + + response = etl.extract.main.etl_extract(request) + + assert response == {'id': uuid4_fun.return_value, 'success': True} + + @patch('etl.load.main.bq.Client', autospec=True) + def test_etl_load_not_found_record( + self, bq_client + ): + request = MagicMock(args={}, spec=['__len__', 'toJSON', 'authorization', 'get_json']) + request.get_json.return_value = json.loads('{"request_id": "1234567890"}') + + query_job_result = MagicMock(items=[], args={}, spec=[]) + query_job_result.total_rows = 0 + + query_result = MagicMock(args={}, spec=['result']) + query_result.result.return_value = query_job_result + + bq_client_instance = bq_client.return_value + bq_client_instance.query.return_value = query_result + + response = etl.load.main.etl_load(request) + assert response == ({'success': False, 'message': 'Record with id: 1234567890 not found'}, 404) + + @patch('etl.load.main.bq.Client', autospec=True) + def test_etl_load_found_record_simple_payload( + self, bq_client + ): + request = MagicMock(args={}, spec=['__len__', 'toJSON', 'authorization', 'get_json']) + request.get_json.return_value = json.loads('{"request_id": "1234567890"}') + + query_row = MagicMock(args={}, spec=['body']) + query_row.body = ETL_SAMPLE_RECORD + + query_job_result = MagicMock(args={}, spec=['__iter__', '__next__']) + query_job_result.total_rows = 1 + query_job_result.__iter__.return_value = [query_row] + + query_result = MagicMock(args={}, spec=['result']) + query_result.result.return_value = query_job_result + + bq_client_instance = bq_client.return_value + bq_client_instance.query.return_value = query_result + + response = etl.load.main.etl_load(request) + + assert response == { + 'id': '1234567890', + 'record': json.loads(ETL_SAMPLE_RECORD), + 'success': True + } + + @patch('etl.load.main.bq.Client', autospec=True) + def test_etl_load_found_record_pubsub_payload( + self, bq_client + ): + request = MagicMock(args={}, spec=['__len__', 'toJSON', 'authorization', 'get_json']) + + pubsub_payload_example = { + 'deliveryAttempt': 1, + 'message': { + 'data': 'eyJyZXF1ZXN0X2lkIjogIjZkYzRiOWFlLTc0ZWUtNDJlZS05Mjk4LWIwYTUxZDVjNjgzNiIsICJ0aW1lc3RhbXAiOiAiMjAyMy0wOC0yMlQwMDo1OTo0My40ODU5MjYiLCAidHlwZSI6ICIvIiwgInN1Ym1pdHRpbmdfdXNlciI6ICJtaWxvc2xhdi5oeWJlbkBwb3B1bGF0aW9uZ2Vub21pY3Mub3JnLmF1In0=', + 'messageId': '8130738175803139', + 'message_id': '8130738175803139', + 'publishTime': '2023-08-22T00:59:44.062Z', + 'publish_time': '2023-08-22T00:59:44.062Z' + }, + 'subscription': 'projects/project/subscriptions/metamist-etl-subscription-ab63723' + } + + request.get_json.return_value = pubsub_payload_example + + query_row = MagicMock(args={}, spec=['body']) + query_row.body = ETL_SAMPLE_RECORD + + query_job_result = MagicMock(args={}, spec=['__iter__', '__next__']) + query_job_result.total_rows = 1 + query_job_result.__iter__.return_value = [query_row] + + query_result = MagicMock(args={}, spec=['result']) + query_result.result.return_value = query_job_result + + bq_client_instance = bq_client.return_value + bq_client_instance.query.return_value = query_result + + response = etl.load.main.etl_load(request) + + assert response == { + 'id': '6dc4b9ae-74ee-42ee-9298-b0a51d5c6836', + 'record': json.loads(ETL_SAMPLE_RECORD), + 'success': True + } diff --git a/test/test_parse_sample_json.py b/test/test_parse_sample_json.py index dad7e769b..26039debf 100644 --- a/test/test_parse_sample_json.py +++ b/test/test_parse_sample_json.py @@ -1,6 +1,4 @@ from test.testbase import DbIsolatedTest, run_as_sync -import pytest - from metamist.parser.sample_json_parser import SampleJsonParser @@ -8,7 +6,7 @@ class TestSampleJsonParser(DbIsolatedTest): """Test the SampleJsonParser""" @run_as_sync - async def test_empty_json(self): + async def test_sample_parser_empty_json(self): """ Test empty json """ @@ -21,9 +19,13 @@ async def test_empty_json(self): # TODO # check the output of parse fun - # for time being check for Exception - - with pytest.raises(NotImplementedError): + # for time being check for NotImplementedError Exception + try: await parser.parse( empty_record, dry_run=True ) + result = False + except NotImplementedError: + result = True + + self.assertTrue(result, msg='Parse did not cause the wanted error.')