Skip to content

Commit

Permalink
Support for ETL parser configurations (#678)
Browse files Browse the repository at this point in the history
* Add support for etl parser configuration

* Untested remap service-account to ID

* Small fix

* Add asserts + fix config

* Add type annotations and improve parser_name variable

* Move immediately instantiated clients to fix tests

* Fix random linting things

* Add new + refactor tests to reduce duplication

* Address review feedback

* Update metamist_infra version to 1.1.0

---------

Co-authored-by: Michael Franklin <[email protected]>
  • Loading branch information
illusional and illusional authored Feb 12, 2024
1 parent 5debebf commit bea77e3
Show file tree
Hide file tree
Showing 5 changed files with 523 additions and 196 deletions.
136 changes: 101 additions & 35 deletions etl/load/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,33 @@
import json
import logging
import os
from typing import Any
from functools import lru_cache
from typing import Any, Literal

import flask
import functions_framework
import google.cloud.bigquery as bq
import pkg_resources
from google.cloud import pubsub_v1 # type: ignore
from google.cloud import pubsub_v1, secretmanager

from metamist.parser.generic_parser import GenericParser # type: ignore

# strip whitespace, newlines and '/' for template matching
STRIP_CHARS = '/ \n'
BIGQUERY_TABLE = os.getenv('BIGQUERY_TABLE')
BIGQUERY_LOG_TABLE = os.getenv('BIGQUERY_LOG_TABLE')
NOTIFICATION_PUBSUB_TOPIC = os.getenv('NOTIFICATION_PUBSUB_TOPIC')
DEFAULT_LOAD_CONFIG = os.getenv('DEFAULT_LOAD_CONFIG', '{}')
ETL_ACCESSOR_CONFIG_SECRET = os.getenv('CONFIGURATION_SECRET')


@lru_cache
def _get_bq_client():
return bq.Client()


@lru_cache
def _get_secret_manager():
return secretmanager.SecretManagerServiceClient()


class ParsingStatus:
Expand All @@ -27,6 +42,17 @@ class ParsingStatus:
FAILED = 'FAILED'


@lru_cache
def get_accessor_config() -> dict:
"""
Read the secret from the full secret path: ETL_ACCESSOR_CONFIG_SECRET
"""
response = _get_secret_manager().access_secret_version(
request={'name': ETL_ACCESSOR_CONFIG_SECRET}
)
return json.loads(response.payload.data.decode('UTF-8'))


def call_parser(parser_obj, row_json) -> tuple[str, str]:
"""
This function calls parser_obj.from_json and returns status and result
Expand Down Expand Up @@ -54,9 +80,8 @@ async def run_parser_capture_result(parser_obj, row_data, res, status):

def process_rows(
bq_row: bq.table.Row,
delivery_attempt: int,
delivery_attempt: int | None,
request_id: str,
parser_map: dict,
bq_client: bq.Client,
) -> tuple[str, Any, Any]:
"""
Expand All @@ -66,17 +91,19 @@ def process_rows(
# source_type should be in the format /ParserName/Version e.g.: /bbv/v1

row_json = json.loads(bq_row.body)
submitting_user = bq_row.submitting_user

# get config from payload and merge with the default
config_data = json.loads(DEFAULT_LOAD_CONFIG)
payload_config_data = row_json.get('config')
if payload_config_data:
config_data.update(payload_config_data)
config = {}
if payload_config_data := row_json.get('config'):
config.update(payload_config_data)

# get data from payload or use payload as data
record_data = row_json.get('data', row_json)

(parser_obj, err_msg) = get_parser_instance(parser_map, source_type, config_data)
(parser_obj, err_msg) = get_parser_instance(
submitting_user=submitting_user, request_type=source_type, init_params=config
)

if parser_obj:
# Parse bq_row.body -> Model and upload to metamist database
Expand Down Expand Up @@ -185,14 +212,6 @@ def etl_load(request: flask.Request):
'message': f'Missing or empty request_id: {jbody_str}',
}, 400

# prepare parser map
# parser_map = {
# 'gmp/v1': <class 'metamist.parser.generic_metadata_parser.GenericMetadataParser'>,
# 'sfmp/v1': <class 'metamist.parser.sample_file_map_parser.SampleFileMapParser'>,
# 'bbv/v1': bbv.BbvV1Parser, TODO: add bbv parser
# }
parser_map = prepare_parser_map()

# locate the request_id in bq
query = f"""
SELECT * FROM `{BIGQUERY_TABLE}` WHERE request_id = @request_id
Expand All @@ -201,12 +220,13 @@ def etl_load(request: flask.Request):
bq.ScalarQueryParameter('request_id', 'STRING', request_id),
]

bq_client = bq.Client()
bq_client = _get_bq_client()

job_config = bq.QueryJobConfig()
job_config.query_parameters = query_params
query_job_result = bq_client.query(query, job_config=job_config).result()

if query_job_result.total_rows == 0:
if not query_job_result.total_rows or query_job_result.total_rows == 0:
# Request ID not found
return {
'success': False,
Expand All @@ -224,7 +244,7 @@ def etl_load(request: flask.Request):
bq_job_result = list(query_job_result)[0]

(status, parsing_result, uploaded_record) = process_rows(
bq_job_result, delivery_attempt, request_id, parser_map, bq_client
bq_job_result, delivery_attempt, request_id, bq_client
)

# return success
Expand Down Expand Up @@ -285,45 +305,91 @@ def extract_request_id(jbody: dict[str, Any]) -> tuple[int | None, str | None]:


def get_parser_instance(
parser_map: dict, source_type: str | None, init_params: dict | None
) -> tuple[object | None, str | None]:
submitting_user: str, request_type: str | None, init_params: dict
) -> tuple[GenericParser | None, str | None]:
"""Extract parser name from source_type
Args:
source_type (str | None): _description_
parser_type (str | None): The name of the config.etl.accessors.name to match
Returns:
object | None: _description_
"""
if not source_type:
return None, 'Empty source_type'
if not request_type:
return None, f'No "type" was provided on the request from {submitting_user}'

# check that submitting_user has access to parser

accessor_config: dict[
str,
list[
dict[
Literal['name']
| Literal['parser_name']
| Literal['default_parameters'],
Any,
]
],
] = get_accessor_config()

if submitting_user not in accessor_config:
return None, (
f'Submitting user {submitting_user} is not allowed to access any parsers'
)

# find the config
etl_accessor_config = next(
(
accessor_config
for accessor_config in accessor_config[submitting_user]
if accessor_config['name'].strip(STRIP_CHARS)
== request_type.strip(STRIP_CHARS)
),
None,
)
if not etl_accessor_config:
return None, (
f'Submitting user {submitting_user} is not allowed to access {request_type}'
)

parser_class_ = parser_map.get(source_type, None)
parser_name = (etl_accessor_config.get('parser_name') or request_type).strip(
STRIP_CHARS
)

init_params.update(etl_accessor_config.get('default_parameters', {}))

parser_map = prepare_parser_map()

parser_class_ = parser_map.get(parser_name, None)
if not parser_class_:
# class not found
return None, f'Parser for {source_type} not found'
if request_type.strip(STRIP_CHARS) != parser_name:
return None, (
f'Submitting user {submitting_user} could not find parser for '
f'request type {request_type}, for parser: {parser_name}'
)
return (
None,
f'Submitting user {submitting_user} could not find parser for {request_type}',
)

# TODO: in case of generic metadata parser, we need to create instance
try:
if init_params:
parser_obj = parser_class_(**init_params)
else:
parser_obj = parser_class_()
parser_obj = parser_class_(**(init_params or {}))
except Exception as e: # pylint: disable=broad-exception-caught
logging.error(f'Failed to create parser instance {e}')
return None, f'Failed to create parser instance {e}'

return parser_obj, None


def prepare_parser_map() -> dict:
def prepare_parser_map() -> dict[str, type[GenericParser]]:
"""Prepare parser map
loop through metamist_parser entry points and create map of parsers
"""
parser_map = {}
for entry_point in pkg_resources.iter_entry_points('metamist_parser'):
parser_cls = entry_point.load()
parser_short_name, parser_version = parser_cls.get_info()
parser_map[f'/{parser_short_name}/{parser_version}'] = parser_cls
parser_map[f'{parser_short_name}/{parser_version}'] = parser_cls

return parser_map
Loading

0 comments on commit bea77e3

Please sign in to comment.