From 8b7b69e07df9d827b60009fa2e49878ce1dec54c Mon Sep 17 00:00:00 2001 From: Michael Franklin <22381693+illusional@users.noreply.github.com> Date: Fri, 8 Mar 2024 14:49:57 +1100 Subject: [PATCH] Etl load trials (#695) * Fix parser config format + not row list * Add fallback for non-list rows in from_json * Linting * Linting II --------- Co-authored-by: Michael Franklin --- etl/load/main.py | 46 +++++++++++++++++------- etl/test/test_etl_load.py | 60 +++++++++++++++++-------------- metamist/parser/generic_parser.py | 3 ++ 3 files changed, 71 insertions(+), 38 deletions(-) diff --git a/etl/load/main.py b/etl/load/main.py index 65e7a2dc6..28f3b291d 100644 --- a/etl/load/main.py +++ b/etl/load/main.py @@ -1,6 +1,7 @@ import asyncio import base64 import datetime +import importlib.metadata import json import logging import os @@ -10,7 +11,6 @@ import flask import functions_framework import google.cloud.bigquery as bq -import pkg_resources from google.cloud import pubsub_v1, secretmanager from metamist.parser.generic_parser import GenericParser # type: ignore @@ -25,14 +25,23 @@ @lru_cache def _get_bq_client(): + assert BIGQUERY_TABLE, 'BIGQUERY_TABLE is not set' + assert BIGQUERY_LOG_TABLE, 'BIGQUERY_LOG_TABLE is not set' return bq.Client() @lru_cache def _get_secret_manager(): + assert ETL_ACCESSOR_CONFIG_SECRET, 'CONFIGURATION_SECRET is not set' return secretmanager.SecretManagerServiceClient() +@lru_cache +def _get_pubsub_client(): + assert NOTIFICATION_PUBSUB_TOPIC, 'NOTIFICATION_PUBSUB_TOPIC is not set' + return pubsub_v1.PublisherClient() + + class ParsingStatus: """ Enum type to distinguish between sucess and failure of parsing @@ -65,7 +74,7 @@ def call_parser(parser_obj, row_json) -> tuple[str, str]: async def run_parser_capture_result(parser_obj, row_data, res, status): try: # TODO better error handling - r = await parser_obj.from_json([row_data], confirm=False, dry_run=True) + r = await parser_obj.from_json(row_data, confirm=False) res.append(r) status.append(ParsingStatus.SUCCESS) except Exception as e: # pylint: disable=broad-exception-caught @@ -148,7 +157,7 @@ def process_rows( # publish to notification pubsub msg_title = 'Metamist ETL Load Failed' try: - pubsub_client = pubsub_v1.PublisherClient() + pubsub_client = _get_pubsub_client() pubsub_client.publish( NOTIFICATION_PUBSUB_TOPIC, json.dumps({'title': msg_title} | log_record).encode(), @@ -212,6 +221,15 @@ def etl_load(request: flask.Request): 'message': f'Missing or empty request_id: {jbody_str}', }, 400 + return process_request(request_id, delivery_attempt) + + +def process_request( + request_id: str, delivery_attempt: int | None = None +) -> tuple[dict, int]: + """ + Process request_id, delivery_attempt and return result + """ # locate the request_id in bq query = f""" SELECT * FROM `{BIGQUERY_TABLE}` WHERE request_id = @request_id @@ -322,13 +340,16 @@ def get_parser_instance( accessor_config: dict[ str, - list[ - dict[ - Literal['name'] - | Literal['parser_name'] - | Literal['default_parameters'], - Any, - ] + dict[ + Literal['parsers'], + list[ + dict[ + Literal['name'] + | Literal['parser_name'] + | Literal['default_parameters'], + Any, + ] + ], ], ] = get_accessor_config() @@ -341,7 +362,7 @@ def get_parser_instance( etl_accessor_config = next( ( accessor_config - for accessor_config in accessor_config[submitting_user] + for accessor_config in accessor_config[submitting_user].get('parsers', []) if accessor_config['name'].strip(STRIP_CHARS) == request_type.strip(STRIP_CHARS) ), @@ -387,7 +408,8 @@ def prepare_parser_map() -> dict[str, type[GenericParser]]: loop through metamist_parser entry points and create map of parsers """ parser_map = {} - for entry_point in pkg_resources.iter_entry_points('metamist_parser'): + + for entry_point in importlib.metadata.entry_points().get('metamist_parser'): parser_cls = entry_point.load() parser_short_name, parser_version = parser_cls.get_info() parser_map[f'{parser_short_name}/{parser_version}'] = parser_cls diff --git a/etl/test/test_etl_load.py b/etl/test/test_etl_load.py index 203bc9f33..a1ccbb4c1 100644 --- a/etl/test/test_etl_load.py +++ b/etl/test/test_etl_load.py @@ -200,13 +200,15 @@ def test_get_parser_instance_success( """Test get_parser_instance success""" mock_get_accessor_config.return_value = { - 'user@test.com': [ - { - 'name': 'test/v1', - 'default_parameters': {}, - # 'parser_name': 'test', - } - ] + 'user@test.com': { + 'parsers': [ + { + 'name': 'test/v1', + 'default_parameters': {}, + # 'parser_name': 'test', + } + ] + } } mock_prepare_parser_map.return_value = { @@ -234,13 +236,15 @@ def test_get_parser_instance_different_parser_name( """Test get_parser_instance success""" mock_get_accessor_config.return_value = { - 'user@test.com': [ - { - 'name': 'test/v1', - 'default_parameters': {'project': 'test'}, - 'parser_name': 'different_parser/name', - } - ] + 'user@test.com': { + 'parsers': [ + { + 'name': 'test/v1', + 'default_parameters': {'project': 'test'}, + 'parser_name': 'different_parser/name', + } + ] + } } mock_prepare_parser_map.return_value = { @@ -284,12 +288,14 @@ def test_get_parser_no_matching_config( """Test get_parser_instance success""" mock_get_accessor_config.return_value = { - 'user@test.com': [ - { - 'name': 'test/v1', - 'default_parameters': {'project': 'test'}, - } - ] + 'user@test.com': { + 'parsers': [ + { + 'name': 'test/v1', + 'default_parameters': {'project': 'test'}, + } + ] + } } # this doesn't need to be mocked as it fails before here @@ -316,12 +322,14 @@ def test_get_parser_no_matching_parser( """Test get_parser_instance success""" mock_get_accessor_config.return_value = { - 'user@test.com': [ - { - 'name': 'a/b', - 'default_parameters': {'project': 'test'}, - } - ] + 'user@test.com': { + 'parsers': [ + { + 'name': 'a/b', + 'default_parameters': {'project': 'test'}, + } + ] + } } mock_prepare_parser_map.return_value = { diff --git a/metamist/parser/generic_parser.py b/metamist/parser/generic_parser.py index 8dc2de5d7..fbe95136b 100644 --- a/metamist/parser/generic_parser.py +++ b/metamist/parser/generic_parser.py @@ -534,6 +534,9 @@ async def from_json(self, rows, confirm=False, dry_run=False): If no participants are present, groups samples by their IDs. For each sample, gets its sequencing groups by their keys. For each sequencing group, groups assays and analyses. """ + if not isinstance(rows, list): + rows = [rows] + await self.validate_rows(rows) # one participant with no value