Skip to content

Commit

Permalink
Etl load trials (#695)
Browse files Browse the repository at this point in the history
* Fix parser config format + not row list

* Add fallback for non-list rows in from_json

* Linting

* Linting II

---------

Co-authored-by: Michael Franklin <[email protected]>
  • Loading branch information
illusional and illusional authored Mar 8, 2024
1 parent f67695d commit 8b7b69e
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 38 deletions.
46 changes: 34 additions & 12 deletions etl/load/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import base64
import datetime
import importlib.metadata
import json
import logging
import os
Expand All @@ -10,7 +11,6 @@
import flask
import functions_framework
import google.cloud.bigquery as bq
import pkg_resources
from google.cloud import pubsub_v1, secretmanager

from metamist.parser.generic_parser import GenericParser # type: ignore
Expand All @@ -25,14 +25,23 @@

@lru_cache
def _get_bq_client():
assert BIGQUERY_TABLE, 'BIGQUERY_TABLE is not set'
assert BIGQUERY_LOG_TABLE, 'BIGQUERY_LOG_TABLE is not set'
return bq.Client()


@lru_cache
def _get_secret_manager():
assert ETL_ACCESSOR_CONFIG_SECRET, 'CONFIGURATION_SECRET is not set'
return secretmanager.SecretManagerServiceClient()


@lru_cache
def _get_pubsub_client():
assert NOTIFICATION_PUBSUB_TOPIC, 'NOTIFICATION_PUBSUB_TOPIC is not set'
return pubsub_v1.PublisherClient()


class ParsingStatus:
"""
Enum type to distinguish between sucess and failure of parsing
Expand Down Expand Up @@ -65,7 +74,7 @@ def call_parser(parser_obj, row_json) -> tuple[str, str]:
async def run_parser_capture_result(parser_obj, row_data, res, status):
try:
# TODO better error handling
r = await parser_obj.from_json([row_data], confirm=False, dry_run=True)
r = await parser_obj.from_json(row_data, confirm=False)
res.append(r)
status.append(ParsingStatus.SUCCESS)
except Exception as e: # pylint: disable=broad-exception-caught
Expand Down Expand Up @@ -148,7 +157,7 @@ def process_rows(
# publish to notification pubsub
msg_title = 'Metamist ETL Load Failed'
try:
pubsub_client = pubsub_v1.PublisherClient()
pubsub_client = _get_pubsub_client()
pubsub_client.publish(
NOTIFICATION_PUBSUB_TOPIC,
json.dumps({'title': msg_title} | log_record).encode(),
Expand Down Expand Up @@ -212,6 +221,15 @@ def etl_load(request: flask.Request):
'message': f'Missing or empty request_id: {jbody_str}',
}, 400

return process_request(request_id, delivery_attempt)


def process_request(
request_id: str, delivery_attempt: int | None = None
) -> tuple[dict, int]:
"""
Process request_id, delivery_attempt and return result
"""
# locate the request_id in bq
query = f"""
SELECT * FROM `{BIGQUERY_TABLE}` WHERE request_id = @request_id
Expand Down Expand Up @@ -322,13 +340,16 @@ def get_parser_instance(

accessor_config: dict[
str,
list[
dict[
Literal['name']
| Literal['parser_name']
| Literal['default_parameters'],
Any,
]
dict[
Literal['parsers'],
list[
dict[
Literal['name']
| Literal['parser_name']
| Literal['default_parameters'],
Any,
]
],
],
] = get_accessor_config()

Expand All @@ -341,7 +362,7 @@ def get_parser_instance(
etl_accessor_config = next(
(
accessor_config
for accessor_config in accessor_config[submitting_user]
for accessor_config in accessor_config[submitting_user].get('parsers', [])
if accessor_config['name'].strip(STRIP_CHARS)
== request_type.strip(STRIP_CHARS)
),
Expand Down Expand Up @@ -387,7 +408,8 @@ def prepare_parser_map() -> dict[str, type[GenericParser]]:
loop through metamist_parser entry points and create map of parsers
"""
parser_map = {}
for entry_point in pkg_resources.iter_entry_points('metamist_parser'):

for entry_point in importlib.metadata.entry_points().get('metamist_parser'):
parser_cls = entry_point.load()
parser_short_name, parser_version = parser_cls.get_info()
parser_map[f'{parser_short_name}/{parser_version}'] = parser_cls
Expand Down
60 changes: 34 additions & 26 deletions etl/test/test_etl_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,15 @@ def test_get_parser_instance_success(
"""Test get_parser_instance success"""

mock_get_accessor_config.return_value = {
'[email protected]': [
{
'name': 'test/v1',
'default_parameters': {},
# 'parser_name': 'test',
}
]
'[email protected]': {
'parsers': [
{
'name': 'test/v1',
'default_parameters': {},
# 'parser_name': 'test',
}
]
}
}

mock_prepare_parser_map.return_value = {
Expand Down Expand Up @@ -234,13 +236,15 @@ def test_get_parser_instance_different_parser_name(
"""Test get_parser_instance success"""

mock_get_accessor_config.return_value = {
'[email protected]': [
{
'name': 'test/v1',
'default_parameters': {'project': 'test'},
'parser_name': 'different_parser/name',
}
]
'[email protected]': {
'parsers': [
{
'name': 'test/v1',
'default_parameters': {'project': 'test'},
'parser_name': 'different_parser/name',
}
]
}
}

mock_prepare_parser_map.return_value = {
Expand Down Expand Up @@ -284,12 +288,14 @@ def test_get_parser_no_matching_config(
"""Test get_parser_instance success"""

mock_get_accessor_config.return_value = {
'[email protected]': [
{
'name': 'test/v1',
'default_parameters': {'project': 'test'},
}
]
'[email protected]': {
'parsers': [
{
'name': 'test/v1',
'default_parameters': {'project': 'test'},
}
]
}
}

# this doesn't need to be mocked as it fails before here
Expand All @@ -316,12 +322,14 @@ def test_get_parser_no_matching_parser(
"""Test get_parser_instance success"""

mock_get_accessor_config.return_value = {
'[email protected]': [
{
'name': 'a/b',
'default_parameters': {'project': 'test'},
}
]
'[email protected]': {
'parsers': [
{
'name': 'a/b',
'default_parameters': {'project': 'test'},
}
]
}
}

mock_prepare_parser_map.return_value = {
Expand Down
3 changes: 3 additions & 0 deletions metamist/parser/generic_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,9 @@ async def from_json(self, rows, confirm=False, dry_run=False):
If no participants are present, groups samples by their IDs.
For each sample, gets its sequencing groups by their keys. For each sequencing group, groups assays and analyses.
"""
if not isinstance(rows, list):
rows = [rows]

await self.validate_rows(rows)

# one participant with no value
Expand Down

0 comments on commit 8b7b69e

Please sign in to comment.