Skip to content

Commit

Permalink
Implemented entry points for metamist_parser, added ability to includ…
Browse files Browse the repository at this point in the history
…e extra private python repos in gc functions.
  • Loading branch information
milo-hyben committed Sep 15, 2023
1 parent ebbba83 commit ed36096
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 105 deletions.
54 changes: 19 additions & 35 deletions etl/load/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,7 @@

from google.cloud import pubsub_v1

# import all public parsers
import metamist.parser as mp

# try to import private parsers
# try:
# import metamist_private.parser as mpp
# except ImportError:
# pass
import pkg_resources


BIGQUERY_TABLE = os.getenv('BIGQUERY_TABLE')
Expand Down Expand Up @@ -70,13 +63,17 @@ def process_rows(query_job_result, delivery_attempt, request_id, parser_map, bq_
# get data from payload or use payload as data
record_data = row_json.get('data', row_json)

parser_obj = get_parser_instance(parser_map, sample_type, config_data)
(parser_obj, err_msg) = get_parser_instance(
parser_map, sample_type, config_data
)
if parser_obj:
# Parse row.body -> Model and upload to metamist database
status, parsing_result = call_parser(parser_obj, record_data)
else:
status = 'FAILED'
parsing_result = f'Missing or invalid sample_type: {sample_type} in the record with id: {request_id}'
parsing_result = (
f'Error: {err_msg} when parsing record with id: {request_id}'
)

if delivery_attempt == 1:
# log only at the first attempt
Expand Down Expand Up @@ -175,7 +172,7 @@ def etl_load(request: flask.Request):
# 'sfmp/v1': <class 'metamist.parser.sample_file_map_parser.SampleFileMapParser'>,
# 'bbv/v1': bbv.BbvV1Parser, TODO: add bbv parser
# }
parser_map = prepare_parser_map(mp.GenericParser, default_version='v1')
parser_map = prepare_parser_map()

# locate the request_id in bq
query = f"""
Expand Down Expand Up @@ -270,12 +267,12 @@ def get_parser_instance(
object | None: _description_
"""
if not sample_type:
return None
return None, 'Empty sample_type'

parser_class_ = parser_map.get(sample_type, None)
if not parser_class_:
# class not found
return None
return None, f'Parser for {sample_type} not found'

# TODO: in case of generic metadata parser, we need to create instance
try:
Expand All @@ -285,32 +282,19 @@ def get_parser_instance(
parser_obj = parser_class_()
except Exception as e: # pylint: disable=broad-exception-caught
logging.error(f'Failed to create parser instance {e}')
return None

return parser_obj


def all_subclasses(cls: object) -> set:
"""Recursively find all subclasses of cls"""
return set(cls.__subclasses__()).union(
[s for c in cls.__subclasses__() for s in all_subclasses(c)]
)
return None, f'Failed to create parser instance {e}'

return parser_obj, None

def prepare_parser_map(cls, default_version='v1'):
"""Prepare parser map for the given class

Args:
cls: class to find subclasses of
version: version of the parser
Returns:
parser map
def prepare_parser_map():
"""Prepare parser map
loop through metamist_parser entry points and create map of parsers
"""
parser_map = {}
for parser_cls in all_subclasses(cls):
parser_code = ''.join(
[ch for ch in str(parser_cls).rsplit('.', maxsplit=1)[-1] if ch.isupper()]
)
parser_map[f'/{parser_code.lower()}/{default_version}'] = parser_cls
for entry_point in pkg_resources.iter_entry_points('metamist_parser'):
parser_cls = entry_point.load()
parser_short_name, parser_version = parser_cls.get_info()
parser_map[f'/{parser_short_name}/{parser_version}'] = parser_cls

return parser_map
Binary file modified etl/load/metamist-6.2.0.tar.gz
Binary file not shown.
2 changes: 2 additions & 0 deletions etl/load/requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
keyring
keyrings.google-artifactregistry-auth
3 changes: 2 additions & 1 deletion etl/load/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
-i https://pypi.org/simple
flask
functions_framework
google-cloud-bigquery
google-cloud-logging
google-cloud-pubsub
# will be replaced with metamist once it cotains the parser changes
# will be replaced with metamist once it contains the parser changes
./metamist-6.2.0.tar.gz
62 changes: 2 additions & 60 deletions etl/test/test_etl_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from unittest.mock import MagicMock, patch

import etl.load.main
import metamist.parser as mp


ETL_SAMPLE_RECORD_1 = """
{
Expand Down Expand Up @@ -138,7 +138,7 @@ async def test_etl_load_found_record_simple_payload(self, call_parser, bq_client
{
'id': '1234567890',
'record': json.loads(ETL_SAMPLE_RECORD_2),
'result': 'Missing or invalid sample_type: /bbv/v1 in the record with id: 1234567890',
'result': 'Error: Parser for /bbv/v1 not found when parsing record with id: 1234567890',
'success': False,
},
)
Expand Down Expand Up @@ -202,61 +202,3 @@ async def test_etl_load_found_record_pubsub_payload(self, call_parser, bq_client
'success': True,
},
)

@run_as_sync
async def test_etl_load_parser(
self,
):
"""Test simple parsing of json data
Comment out if you want to test using LOCAL environment,
"""

PARTICIPANT_COL_NAME = 'individual_id'
SAMPLE_ID_COL_NAME = 'sample_id'
SEQ_TYPE_COL_NAME = 'sequencing_type'

sample_meta_map = {
'collection_centre': 'centre',
'collection_date': 'collection_date',
'collection_specimen': 'specimen',
}

default_sequencing_type = 'genome'
default_sequencing_technology = 'short-read'

# parser =
mp.GenericMetadataParser(
search_locations=[],
project=self.project_name,
participant_column=PARTICIPANT_COL_NAME,
sample_name_column=SAMPLE_ID_COL_NAME,
reads_column=None,
checksum_column=None,
seq_type_column=SEQ_TYPE_COL_NAME,
default_sequencing_type=default_sequencing_type,
default_sample_type='blood',
default_sequencing_technology=default_sequencing_technology,
default_reference_assembly_location=None,
participant_meta_map={},
sample_meta_map=sample_meta_map,
assay_meta_map={},
qc_meta_map={},
allow_extra_files_in_search_path=None,
key_map=None,
)

# json_data = [
# {
# 'sample_id': '123456',
# 'external_id': 'GRK100311',
# 'individual_id': '608',
# 'sequencing_type': 'exome',
# 'collection_centre': 'KCCG',
# 'collection_date': '2023-08-05T01:39:28.611476',
# 'collection_specimen': 'blood',
# }
# ]
# res = await parser.from_json(json_data, confirm=False, dry_run=True)
# print(res)

# assert False
5 changes: 0 additions & 5 deletions metamist/parser/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +0,0 @@
# Parser package for the metamist project
from .cloudhelper import CloudHelper
from .generic_metadata_parser import GenericMetadataParser
from .generic_parser import GenericParser
from .sample_file_map_parser import SampleFileMapParser
7 changes: 7 additions & 0 deletions metamist/parser/generic_metadata_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,13 @@ async def get_analyses_from_sequencing_group(
)
]

@staticmethod
def get_info():
"""
Information about parser, including short name and version
"""
return ('gmp', 'v1')


@click.command(help=__DOC)
@click.option(
Expand Down
7 changes: 7 additions & 0 deletions metamist/parser/sample_file_map_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ def get_sample_id(self, row: SingleRow) -> str:

return self.get_participant_id(row)

@staticmethod
def get_info():
"""
Information about parser, including short name and version
"""
return ('sfmp', 'v1')


@click.command(help=__DOC)
@click.option(
Expand Down
40 changes: 36 additions & 4 deletions metamist_infrastructure/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,25 @@
PATH_TO_ETL_BQ_LOG_SCHEMA = ETL_FOLDER / 'bq_log_schema.json'


# TODO: update implementation in cpg_infra project to enable binary files
# TODO: update implementation in cpg_infra project to enable binary files & private repos?
def append_private_repositories_to_requirements(
file_content: str, private_repo_url: str, private_repos: str
) -> str:
"""
Append private repositories to requirements.txt
"""
file_content += f"""
--extra-index-url {private_repo_url}
{private_repos}
"""
return file_content


def archive_folder(
path: str, allowed_extensions: frozenset[str]
path: str,
allowed_extensions: frozenset[str],
private_repo_url: str,
private_repos: str,
) -> pulumi.AssetArchive:
"""Archive a folder into a pulumi asset archive"""
assets = {}
Expand All @@ -48,7 +64,12 @@ def archive_folder(
else:
with open(filename, encoding='utf-8') as file:
# do it this way to stop any issues with changing paths
assets[filename] = pulumi.StringAsset(file.read())
file_content = file.read()
if filename == 'requirements.txt' and private_repo_url:
file_content = append_private_repositories_to_requirements(
file_content, private_repo_url, private_repos
)
assets[filename] = pulumi.StringAsset(file_content)
return pulumi.AssetArchive(assets)


Expand All @@ -61,6 +82,13 @@ def main(self):
"""Driver for the metamist infrastructure as code plugin"""
# todo, eventually configure metamist cloud run server
# to be deployed here, but for now it's manually deployed

# TODO: the following should be added to SampleMetadataConfig
self.extra_sample_metadata_config = {
'private_repo_url': 'https://australia-southeast1-python.pkg.dev/milo-dev-396001/python-repo/simple',
'private_repos': 'metamist_private',
'environment': 'DEVELOPMENT',
}
self._setup_etl()

@cached_property
Expand Down Expand Up @@ -480,6 +508,9 @@ def _etl_function(self, f_name: str, sa: object):
archive = archive_folder(
str(path_to_func_folder.absolute()),
allowed_extensions=frozenset({'.gz', '.py', '.txt', '.json'}),
# TODO replace with metamist config, once it's available
private_repo_url=self.extra_sample_metadata_config['private_repo_url'],
private_repos=self.extra_sample_metadata_config['private_repos'],
)

# Create the single Cloud Storage object,
Expand Down Expand Up @@ -535,7 +566,8 @@ def _etl_function(self, f_name: str, sa: object):
'NOTIFICATION_PUBSUB_TOPIC': self.etl_slack_notification_topic.id
if self.etl_slack_notification_topic
else '',
'SM_ENVIRONMENT': 'DEVELOPMENT', # TODO: make it configurable
# TODO replace with metamist config, once it's available
'SM_ENVIRONMENT': self.extra_sample_metadata_config['environment'],
},
ingress_settings='ALLOW_ALL',
all_traffic_on_latest_revision=True,
Expand Down
6 changes: 6 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@
'cpg-utils >= 4.9.4',
'gql[aiohttp,requests]',
],
entry_points={
'metamist_parser': [
'GenericMetadataParser = metamist.parser.generic_metadata_parser:GenericMetadataParser',
'SampleFileMapParser = metamist.parser.sample_file_map_parser:SampleFileMapParser',
],
},
include_package_data=True,
zip_safe=False,
keywords='bioinformatics',
Expand Down

0 comments on commit ed36096

Please sign in to comment.