Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 6.8.0 #685

Merged
merged 5 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 6.7.0
current_version = 6.8.0
commit = True
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>[A-z0-9-]+)
Expand Down
2 changes: 1 addition & 1 deletion api/routes/sequencing_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async def get_all_sequencing_group_ids_by_sample_by_type(
}


@router.get('/project/{sequencing_group_id}', operation_id='updateSequencingGroup')
@router.patch('/project/{sequencing_group_id}', operation_id='updateSequencingGroup')
async def update_sequencing_group(
sequencing_group_id: str,
sequencing_group: SequencingGroupMetaUpdateModel,
Expand Down
2 changes: 1 addition & 1 deletion api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from db.python.utils import get_logger

# This tag is automatically updated by bump2version
_VERSION = '6.7.0'
_VERSION = '6.8.0'

logger = get_logger()

Expand Down
2 changes: 1 addition & 1 deletion deploy/python/version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
6.7.0
6.8.0
136 changes: 101 additions & 35 deletions etl/load/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,33 @@
import json
import logging
import os
from typing import Any
from functools import lru_cache
from typing import Any, Literal

import flask
import functions_framework
import google.cloud.bigquery as bq
import pkg_resources
from google.cloud import pubsub_v1 # type: ignore
from google.cloud import pubsub_v1, secretmanager

from metamist.parser.generic_parser import GenericParser # type: ignore

# strip whitespace, newlines and '/' for template matching
STRIP_CHARS = '/ \n'
BIGQUERY_TABLE = os.getenv('BIGQUERY_TABLE')
BIGQUERY_LOG_TABLE = os.getenv('BIGQUERY_LOG_TABLE')
NOTIFICATION_PUBSUB_TOPIC = os.getenv('NOTIFICATION_PUBSUB_TOPIC')
DEFAULT_LOAD_CONFIG = os.getenv('DEFAULT_LOAD_CONFIG', '{}')
ETL_ACCESSOR_CONFIG_SECRET = os.getenv('CONFIGURATION_SECRET')


@lru_cache
def _get_bq_client():
return bq.Client()


@lru_cache
def _get_secret_manager():
return secretmanager.SecretManagerServiceClient()


class ParsingStatus:
Expand All @@ -27,6 +42,17 @@ class ParsingStatus:
FAILED = 'FAILED'


@lru_cache
def get_accessor_config() -> dict:
"""
Read the secret from the full secret path: ETL_ACCESSOR_CONFIG_SECRET
"""
response = _get_secret_manager().access_secret_version(
request={'name': ETL_ACCESSOR_CONFIG_SECRET}
)
return json.loads(response.payload.data.decode('UTF-8'))


def call_parser(parser_obj, row_json) -> tuple[str, str]:
"""
This function calls parser_obj.from_json and returns status and result
Expand Down Expand Up @@ -54,9 +80,8 @@ async def run_parser_capture_result(parser_obj, row_data, res, status):

def process_rows(
bq_row: bq.table.Row,
delivery_attempt: int,
delivery_attempt: int | None,
request_id: str,
parser_map: dict,
bq_client: bq.Client,
) -> tuple[str, Any, Any]:
"""
Expand All @@ -66,17 +91,19 @@ def process_rows(
# source_type should be in the format /ParserName/Version e.g.: /bbv/v1

row_json = json.loads(bq_row.body)
submitting_user = bq_row.submitting_user

# get config from payload and merge with the default
config_data = json.loads(DEFAULT_LOAD_CONFIG)
payload_config_data = row_json.get('config')
if payload_config_data:
config_data.update(payload_config_data)
config = {}
if payload_config_data := row_json.get('config'):
config.update(payload_config_data)

# get data from payload or use payload as data
record_data = row_json.get('data', row_json)

(parser_obj, err_msg) = get_parser_instance(parser_map, source_type, config_data)
(parser_obj, err_msg) = get_parser_instance(
submitting_user=submitting_user, request_type=source_type, init_params=config
)

if parser_obj:
# Parse bq_row.body -> Model and upload to metamist database
Expand Down Expand Up @@ -185,14 +212,6 @@ def etl_load(request: flask.Request):
'message': f'Missing or empty request_id: {jbody_str}',
}, 400

# prepare parser map
# parser_map = {
# 'gmp/v1': <class 'metamist.parser.generic_metadata_parser.GenericMetadataParser'>,
# 'sfmp/v1': <class 'metamist.parser.sample_file_map_parser.SampleFileMapParser'>,
# 'bbv/v1': bbv.BbvV1Parser, TODO: add bbv parser
# }
parser_map = prepare_parser_map()

# locate the request_id in bq
query = f"""
SELECT * FROM `{BIGQUERY_TABLE}` WHERE request_id = @request_id
Expand All @@ -201,12 +220,13 @@ def etl_load(request: flask.Request):
bq.ScalarQueryParameter('request_id', 'STRING', request_id),
]

bq_client = bq.Client()
bq_client = _get_bq_client()

job_config = bq.QueryJobConfig()
job_config.query_parameters = query_params
query_job_result = bq_client.query(query, job_config=job_config).result()

if query_job_result.total_rows == 0:
if not query_job_result.total_rows or query_job_result.total_rows == 0:
# Request ID not found
return {
'success': False,
Expand All @@ -224,7 +244,7 @@ def etl_load(request: flask.Request):
bq_job_result = list(query_job_result)[0]

(status, parsing_result, uploaded_record) = process_rows(
bq_job_result, delivery_attempt, request_id, parser_map, bq_client
bq_job_result, delivery_attempt, request_id, bq_client
)

# return success
Expand Down Expand Up @@ -285,45 +305,91 @@ def extract_request_id(jbody: dict[str, Any]) -> tuple[int | None, str | None]:


def get_parser_instance(
parser_map: dict, source_type: str | None, init_params: dict | None
) -> tuple[object | None, str | None]:
submitting_user: str, request_type: str | None, init_params: dict
) -> tuple[GenericParser | None, str | None]:
"""Extract parser name from source_type

Args:
source_type (str | None): _description_
parser_type (str | None): The name of the config.etl.accessors.name to match

Returns:
object | None: _description_
"""
if not source_type:
return None, 'Empty source_type'
if not request_type:
return None, f'No "type" was provided on the request from {submitting_user}'

# check that submitting_user has access to parser

accessor_config: dict[
str,
list[
dict[
Literal['name']
| Literal['parser_name']
| Literal['default_parameters'],
Any,
]
],
] = get_accessor_config()

if submitting_user not in accessor_config:
return None, (
f'Submitting user {submitting_user} is not allowed to access any parsers'
)

# find the config
etl_accessor_config = next(
(
accessor_config
for accessor_config in accessor_config[submitting_user]
if accessor_config['name'].strip(STRIP_CHARS)
== request_type.strip(STRIP_CHARS)
),
None,
)
if not etl_accessor_config:
return None, (
f'Submitting user {submitting_user} is not allowed to access {request_type}'
)

parser_class_ = parser_map.get(source_type, None)
parser_name = (etl_accessor_config.get('parser_name') or request_type).strip(
STRIP_CHARS
)

init_params.update(etl_accessor_config.get('default_parameters', {}))

parser_map = prepare_parser_map()

parser_class_ = parser_map.get(parser_name, None)
if not parser_class_:
# class not found
return None, f'Parser for {source_type} not found'
if request_type.strip(STRIP_CHARS) != parser_name:
return None, (
f'Submitting user {submitting_user} could not find parser for '
f'request type {request_type}, for parser: {parser_name}'
)
return (
None,
f'Submitting user {submitting_user} could not find parser for {request_type}',
)

# TODO: in case of generic metadata parser, we need to create instance
try:
if init_params:
parser_obj = parser_class_(**init_params)
else:
parser_obj = parser_class_()
parser_obj = parser_class_(**(init_params or {}))
except Exception as e: # pylint: disable=broad-exception-caught
logging.error(f'Failed to create parser instance {e}')
return None, f'Failed to create parser instance {e}'

return parser_obj, None


def prepare_parser_map() -> dict:
def prepare_parser_map() -> dict[str, type[GenericParser]]:
"""Prepare parser map
loop through metamist_parser entry points and create map of parsers
"""
parser_map = {}
for entry_point in pkg_resources.iter_entry_points('metamist_parser'):
parser_cls = entry_point.load()
parser_short_name, parser_version = parser_cls.get_info()
parser_map[f'/{parser_short_name}/{parser_version}'] = parser_cls
parser_map[f'{parser_short_name}/{parser_version}'] = parser_cls

return parser_map
Loading
Loading