From bea77e3907121c85dc0e22a640e01e807075717f Mon Sep 17 00:00:00 2001 From: Michael Franklin <22381693+illusional@users.noreply.github.com> Date: Tue, 13 Feb 2024 08:58:39 +1100 Subject: [PATCH 1/5] Support for ETL parser configurations (#678) * Add support for etl parser configuration * Untested remap service-account to ID * Small fix * Add asserts + fix config * Add type annotations and improve parser_name variable * Move immediately instantiated clients to fix tests * Fix random linting things * Add new + refactor tests to reduce duplication * Address review feedback * Update metamist_infra version to 1.1.0 --------- Co-authored-by: Michael Franklin --- etl/load/main.py | 136 +++++-- etl/test/test_etl_load.py | 373 ++++++++++++------ metamist_infrastructure/driver.py | 199 ++++++++-- metamist_infrastructure/setup.py | 2 +- metamist_infrastructure/slack_notification.py | 9 +- 5 files changed, 523 insertions(+), 196 deletions(-) diff --git a/etl/load/main.py b/etl/load/main.py index 58d7ea5ce..65e7a2dc6 100644 --- a/etl/load/main.py +++ b/etl/load/main.py @@ -4,18 +4,33 @@ import json import logging import os -from typing import Any +from functools import lru_cache +from typing import Any, Literal import flask import functions_framework import google.cloud.bigquery as bq import pkg_resources -from google.cloud import pubsub_v1 # type: ignore +from google.cloud import pubsub_v1, secretmanager +from metamist.parser.generic_parser import GenericParser # type: ignore + +# strip whitespace, newlines and '/' for template matching +STRIP_CHARS = '/ \n' BIGQUERY_TABLE = os.getenv('BIGQUERY_TABLE') BIGQUERY_LOG_TABLE = os.getenv('BIGQUERY_LOG_TABLE') NOTIFICATION_PUBSUB_TOPIC = os.getenv('NOTIFICATION_PUBSUB_TOPIC') -DEFAULT_LOAD_CONFIG = os.getenv('DEFAULT_LOAD_CONFIG', '{}') +ETL_ACCESSOR_CONFIG_SECRET = os.getenv('CONFIGURATION_SECRET') + + +@lru_cache +def _get_bq_client(): + return bq.Client() + + +@lru_cache +def _get_secret_manager(): + return secretmanager.SecretManagerServiceClient() class ParsingStatus: @@ -27,6 +42,17 @@ class ParsingStatus: FAILED = 'FAILED' +@lru_cache +def get_accessor_config() -> dict: + """ + Read the secret from the full secret path: ETL_ACCESSOR_CONFIG_SECRET + """ + response = _get_secret_manager().access_secret_version( + request={'name': ETL_ACCESSOR_CONFIG_SECRET} + ) + return json.loads(response.payload.data.decode('UTF-8')) + + def call_parser(parser_obj, row_json) -> tuple[str, str]: """ This function calls parser_obj.from_json and returns status and result @@ -54,9 +80,8 @@ async def run_parser_capture_result(parser_obj, row_data, res, status): def process_rows( bq_row: bq.table.Row, - delivery_attempt: int, + delivery_attempt: int | None, request_id: str, - parser_map: dict, bq_client: bq.Client, ) -> tuple[str, Any, Any]: """ @@ -66,17 +91,19 @@ def process_rows( # source_type should be in the format /ParserName/Version e.g.: /bbv/v1 row_json = json.loads(bq_row.body) + submitting_user = bq_row.submitting_user # get config from payload and merge with the default - config_data = json.loads(DEFAULT_LOAD_CONFIG) - payload_config_data = row_json.get('config') - if payload_config_data: - config_data.update(payload_config_data) + config = {} + if payload_config_data := row_json.get('config'): + config.update(payload_config_data) # get data from payload or use payload as data record_data = row_json.get('data', row_json) - (parser_obj, err_msg) = get_parser_instance(parser_map, source_type, config_data) + (parser_obj, err_msg) = get_parser_instance( + submitting_user=submitting_user, request_type=source_type, init_params=config + ) if parser_obj: # Parse bq_row.body -> Model and upload to metamist database @@ -185,14 +212,6 @@ def etl_load(request: flask.Request): 'message': f'Missing or empty request_id: {jbody_str}', }, 400 - # prepare parser map - # parser_map = { - # 'gmp/v1': , - # 'sfmp/v1': , - # 'bbv/v1': bbv.BbvV1Parser, TODO: add bbv parser - # } - parser_map = prepare_parser_map() - # locate the request_id in bq query = f""" SELECT * FROM `{BIGQUERY_TABLE}` WHERE request_id = @request_id @@ -201,12 +220,13 @@ def etl_load(request: flask.Request): bq.ScalarQueryParameter('request_id', 'STRING', request_id), ] - bq_client = bq.Client() + bq_client = _get_bq_client() + job_config = bq.QueryJobConfig() job_config.query_parameters = query_params query_job_result = bq_client.query(query, job_config=job_config).result() - if query_job_result.total_rows == 0: + if not query_job_result.total_rows or query_job_result.total_rows == 0: # Request ID not found return { 'success': False, @@ -224,7 +244,7 @@ def etl_load(request: flask.Request): bq_job_result = list(query_job_result)[0] (status, parsing_result, uploaded_record) = process_rows( - bq_job_result, delivery_attempt, request_id, parser_map, bq_client + bq_job_result, delivery_attempt, request_id, bq_client ) # return success @@ -285,30 +305,76 @@ def extract_request_id(jbody: dict[str, Any]) -> tuple[int | None, str | None]: def get_parser_instance( - parser_map: dict, source_type: str | None, init_params: dict | None -) -> tuple[object | None, str | None]: + submitting_user: str, request_type: str | None, init_params: dict +) -> tuple[GenericParser | None, str | None]: """Extract parser name from source_type Args: - source_type (str | None): _description_ + parser_type (str | None): The name of the config.etl.accessors.name to match Returns: object | None: _description_ """ - if not source_type: - return None, 'Empty source_type' + if not request_type: + return None, f'No "type" was provided on the request from {submitting_user}' + + # check that submitting_user has access to parser + + accessor_config: dict[ + str, + list[ + dict[ + Literal['name'] + | Literal['parser_name'] + | Literal['default_parameters'], + Any, + ] + ], + ] = get_accessor_config() + + if submitting_user not in accessor_config: + return None, ( + f'Submitting user {submitting_user} is not allowed to access any parsers' + ) + + # find the config + etl_accessor_config = next( + ( + accessor_config + for accessor_config in accessor_config[submitting_user] + if accessor_config['name'].strip(STRIP_CHARS) + == request_type.strip(STRIP_CHARS) + ), + None, + ) + if not etl_accessor_config: + return None, ( + f'Submitting user {submitting_user} is not allowed to access {request_type}' + ) - parser_class_ = parser_map.get(source_type, None) + parser_name = (etl_accessor_config.get('parser_name') or request_type).strip( + STRIP_CHARS + ) + + init_params.update(etl_accessor_config.get('default_parameters', {})) + + parser_map = prepare_parser_map() + + parser_class_ = parser_map.get(parser_name, None) if not parser_class_: # class not found - return None, f'Parser for {source_type} not found' + if request_type.strip(STRIP_CHARS) != parser_name: + return None, ( + f'Submitting user {submitting_user} could not find parser for ' + f'request type {request_type}, for parser: {parser_name}' + ) + return ( + None, + f'Submitting user {submitting_user} could not find parser for {request_type}', + ) - # TODO: in case of generic metadata parser, we need to create instance try: - if init_params: - parser_obj = parser_class_(**init_params) - else: - parser_obj = parser_class_() + parser_obj = parser_class_(**(init_params or {})) except Exception as e: # pylint: disable=broad-exception-caught logging.error(f'Failed to create parser instance {e}') return None, f'Failed to create parser instance {e}' @@ -316,7 +382,7 @@ def get_parser_instance( return parser_obj, None -def prepare_parser_map() -> dict: +def prepare_parser_map() -> dict[str, type[GenericParser]]: """Prepare parser map loop through metamist_parser entry points and create map of parsers """ @@ -324,6 +390,6 @@ def prepare_parser_map() -> dict: for entry_point in pkg_resources.iter_entry_points('metamist_parser'): parser_cls = entry_point.load() parser_short_name, parser_version = parser_cls.get_info() - parser_map[f'/{parser_short_name}/{parser_version}'] = parser_cls + parser_map[f'{parser_short_name}/{parser_version}'] = parser_cls return parser_map diff --git a/etl/test/test_etl_load.py b/etl/test/test_etl_load.py index c974c6133..203bc9f33 100644 --- a/etl/test/test_etl_load.py +++ b/etl/test/test_etl_load.py @@ -1,94 +1,86 @@ import base64 import json -from test.testbase import DbIsolatedTest, run_as_sync +from unittest import TestCase from unittest.mock import MagicMock, patch import etl.load.main +from metamist.parser.generic_metadata_parser import GenericMetadataParser -ETL_SAMPLE_RECORD_1 = """ -{ - "identifier": "AB0002", - "name": "j smith", - "age": 50, - "measurement": "98.7", - "observation": "B++", - "receipt_date": "1/02/2023" -} -""" - -ETL_SAMPLE_RECORD_2 = """ -{ - "sample_id": "123456", - "external_id": "AAA000000", - "individual_id": "678", - "sequencing_type": "exome", - "collection_centre": "ABCDEF", - "collection_date": "2023-08-05T01:39:28.611476", - "collection_specimen": "blood" -} -""" - -ETL_SAMPLE_RECORD_3 = """ -{ - "config": - { - "search_locations": [], - "project": "milo-dev", - "participant_column": "individual_id", - "sample_name_column": "sample_id", - "seq_type_column": "sequencing_type", - "default_sequencing_type": "sequencing_type", - "default_sample_type": "blood", - "default_sequencing_technology": "short-read", - "sample_meta_map": - { - "collection_centre": "centre", - "collection_date": "collection_date", - "collection_specimen": "specimen" - }, - "participant_meta_map": {}, - "assay_meta_map": {}, - "qc_meta_map": {} - }, - "data": - { - "sample_id": "123456", - "external_id": "AAA000000", - "individual_id": "678", - "sequencing_type": "exome", - "collection_centre": "ABCDEF", - "collection_date": "2023-08-05T01:39:28.611476", - "collection_specimen": "blood" - } -} -""" - - -class TestEtlLoad(DbIsolatedTest): + +class TestGetParserInstance(GenericMetadataParser): + """ + A very basic parser for testing purposes. + """ + + def __init__( + self, + project: str, + ): + super().__init__( + project=project, + sample_name_column='sample', + search_locations=[], + participant_meta_map={}, + sample_meta_map={}, + assay_meta_map={}, + qc_meta_map={}, + allow_extra_files_in_search_path=False, + ) + + def get_sample_id(self, row) -> str: + """Get external sample ID from row""" + return row['sample'] + + @staticmethod + def get_info() -> tuple[str, str]: + """ + Information about parser, including short name and version + """ + return ('test', 'v1') + + +class TestEtlLoad(TestCase): """Test etl cloud functions""" - @run_as_sync - async def setUp(self) -> None: - super().setUp() + @staticmethod + def get_query_job_result( + mock_bq, + request_type: str | None = None, + body: str | None = None, + submitting_user: str | None = None, + ): + """Helper function to mock the bigquery response for etl_load tests""" + + query_job_result = MagicMock(args={}, spec=['__iter__', '__next__']) + + if request_type or body or submitting_user: + query_row = MagicMock(args={}, spec=['body', 'type', 'submitting_user']) + query_row.body = body + query_row.type = request_type + query_row.submitting_user = submitting_user + + query_job_result.total_rows = 1 + query_job_result.__iter__.return_value = [query_row] + else: + query_job_result.total_rows = 0 + query_job_result.__iter__.return_value = [] + + mock_bq.return_value.query.return_value.result.return_value = query_job_result + + @patch('etl.load.main._get_bq_client', autospec=True) + def test_etl_load_not_found_record(self, mock_bq): + """ + Test etl load, where the bigquery record is not found + """ + + # mock should return no rows + self.get_query_job_result(mock_bq) - @run_as_sync - @patch('etl.load.main.bq.Client', autospec=True) - async def test_etl_load_not_found_record(self, bq_client): - """Test etl load not found""" request = MagicMock( args={}, spec=['__len__', 'toJSON', 'authorization', 'get_json'] ) request.get_json.return_value = {'request_id': '1234567890'} - query_job_result = MagicMock(items=[], args={}, spec=[]) - query_job_result.total_rows = 0 - - query_result = MagicMock(args={}, spec=['result']) - query_result.result.return_value = query_job_result - - bq_client_instance = bq_client.return_value - bq_client_instance.query.return_value = query_result - response, status = etl.load.main.etl_load(request) self.assertEqual(status, 412) self.assertEqual( @@ -99,32 +91,31 @@ async def test_etl_load_not_found_record(self, bq_client): }, ) - @run_as_sync - @patch('etl.load.main.bq.Client', autospec=True) - @patch('etl.load.main.call_parser') - async def test_etl_load_found_record_simple_payload(self, call_parser, bq_client): + @patch('etl.load.main._get_bq_client', autospec=True) + @patch('etl.load.main.get_parser_instance') + def test_etl_load_found_record_simple_payload_with_not_found_parser( + self, mock_get_parser_instance, mock_bq + ): """Test etl load simple payload""" - request = MagicMock( - args={}, spec=['__len__', 'toJSON', 'authorization', 'get_json'] - ) - request.get_json.return_value = {'request_id': '1234567890'} - query_row = MagicMock(args={}, spec=['body', 'type', 'submitting_user']) - query_row.body = ETL_SAMPLE_RECORD_2 - query_row.type = '/bbv/v1' - query_row.submitting_user = 'user@mail.com' - - query_job_result = MagicMock(args={}, spec=['__iter__', '__next__']) - query_job_result.total_rows = 1 - query_job_result.__iter__.return_value = [query_row] + record = {'sample': '12345'} - query_result = MagicMock(args={}, spec=['result']) - query_result.result.return_value = query_job_result + # mock the bigquery response + self.get_query_job_result( + mock_bq, + request_type='/bbv/v1', + submitting_user='user@mail.com', + body=json.dumps(record), + ) - bq_client_instance = bq_client.return_value - bq_client_instance.query.return_value = query_result + # we test the parsing functionality below + mock_get_parser_instance.return_value = (None, 'Parser for /bbv/v1 not found') - call_parser.return_value = (etl.load.main.ParsingStatus.SUCCESS, '') + # the "web" request + request = MagicMock( + args={}, spec=['__len__', 'toJSON', 'authorization', 'get_json'] + ) + request.get_json.return_value = {'request_id': '1234567890'} response, status = etl.load.main.etl_load(request) @@ -134,16 +125,18 @@ async def test_etl_load_found_record_simple_payload(self, call_parser, bq_client response, { 'id': '1234567890', - 'record': json.loads(ETL_SAMPLE_RECORD_2), + 'record': record, 'result': 'Error: Parser for /bbv/v1 not found when parsing record with id: 1234567890', 'success': False, }, ) - @run_as_sync - @patch('etl.load.main.bq.Client', autospec=True) + @patch('etl.load.main._get_bq_client', autospec=True) + @patch('etl.load.main.get_parser_instance') @patch('etl.load.main.call_parser') - async def test_etl_load_found_record_pubsub_payload(self, call_parser, bq_client): + def test_etl_load_found_record_pubsub_payload( + self, mock_call_parser, mock_get_parser_instance, mock_bq + ): """Test etl load pubsub payload""" request = MagicMock( args={}, spec=['__len__', 'toJSON', 'authorization', 'get_json'] @@ -170,22 +163,21 @@ async def test_etl_load_found_record_pubsub_payload(self, call_parser, bq_client request.get_json.return_value = pubsub_payload_example - query_row = MagicMock(args={}, spec=['body', 'type', 'submitting_user']) - query_row.body = ETL_SAMPLE_RECORD_3 - query_row.type = '/gmp/v1' - query_row.submitting_user = 'user@mail.com' - - query_job_result = MagicMock(args={}, spec=['__iter__', '__next__']) - query_job_result.total_rows = 1 - query_job_result.__iter__.return_value = [query_row] - - query_result = MagicMock(args={}, spec=['result']) - query_result.result.return_value = query_job_result + record = { + 'config': {'project': 'test'}, + 'data': {'sample': '123456'}, + } - bq_client_instance = bq_client.return_value - bq_client_instance.query.return_value = query_result + # mock the bigquery response + self.get_query_job_result( + mock_bq=mock_bq, + request_type='/gmp/v1', + submitting_user='user@mail.com', + body=json.dumps(record), + ) - call_parser.return_value = (etl.load.main.ParsingStatus.SUCCESS, '') + mock_get_parser_instance.return_value = (TestGetParserInstance, None) + mock_call_parser.return_value = (etl.load.main.ParsingStatus.SUCCESS, '') response, status = etl.load.main.etl_load(request) @@ -194,8 +186,155 @@ async def test_etl_load_found_record_pubsub_payload(self, call_parser, bq_client response, { 'id': '6dc4b9ae-74ee-42ee-9298-b0a51d5c6836', - 'record': json.loads(ETL_SAMPLE_RECORD_3), + 'record': record, 'result': '', 'success': True, }, ) + + @patch('etl.load.main.get_accessor_config') + @patch('etl.load.main.prepare_parser_map') + def test_get_parser_instance_success( + self, mock_prepare_parser_map, mock_get_accessor_config + ): + """Test get_parser_instance success""" + + mock_get_accessor_config.return_value = { + 'user@test.com': [ + { + 'name': 'test/v1', + 'default_parameters': {}, + # 'parser_name': 'test', + } + ] + } + + mock_prepare_parser_map.return_value = { + 'test/v1': TestGetParserInstance, + } + + parser, _ = etl.load.main.get_parser_instance( + submitting_user='user@test.com', + # note the leading slash should be stripped + request_type='/test/v1', + init_params={'project': 'not-overriden'}, + ) + self.assertIsNotNone(parser) + self.assertIsInstance(parser, TestGetParserInstance) + + # do this assert for static analysis + assert isinstance(parser, TestGetParserInstance) + self.assertEqual(parser.project, 'not-overriden') + + @patch('etl.load.main.get_accessor_config') + @patch('etl.load.main.prepare_parser_map') + def test_get_parser_instance_different_parser_name( + self, mock_prepare_parser_map, mock_get_accessor_config + ): + """Test get_parser_instance success""" + + mock_get_accessor_config.return_value = { + 'user@test.com': [ + { + 'name': 'test/v1', + 'default_parameters': {'project': 'test'}, + 'parser_name': 'different_parser/name', + } + ] + } + + mock_prepare_parser_map.return_value = { + 'different_parser/name': TestGetParserInstance, + } + + parser, _ = etl.load.main.get_parser_instance( + submitting_user='user@test.com', + request_type='/test/v1', + init_params={'project': 'to_override'}, + ) + self.assertIsNotNone(parser) + self.assertIsInstance(parser, TestGetParserInstance) + + # do this assert for static analysis + assert isinstance(parser, TestGetParserInstance) + self.assertEqual(parser.project, 'test') + + @patch('etl.load.main.get_accessor_config') + def test_get_parser_instance_fail_no_user(self, mock_get_accessor_config): + """Test get_parser_instance success""" + + mock_get_accessor_config.return_value = {'user@test.com': []} + + parser, error = etl.load.main.get_parser_instance( + submitting_user='non-existent-user@different.com', + request_type='/test/v1', + init_params={'project': 'to_override'}, + ) + self.assertIsNone(parser) + self.assertEqual( + 'Submitting user non-existent-user@different.com is not allowed to access any parsers', + error, + ) + + @patch('etl.load.main.get_accessor_config') + @patch('etl.load.main.prepare_parser_map') + def test_get_parser_no_matching_config( + self, mock_prepare_parser_map, mock_get_accessor_config + ): + """Test get_parser_instance success""" + + mock_get_accessor_config.return_value = { + 'user@test.com': [ + { + 'name': 'test/v1', + 'default_parameters': {'project': 'test'}, + } + ] + } + + # this doesn't need to be mocked as it fails before here + mock_prepare_parser_map.return_value = { + 'test/v1': TestGetParserInstance, + } + + parser, error = etl.load.main.get_parser_instance( + submitting_user='user@test.com', + request_type='test/v2', + init_params={'project': 'to_override'}, + ) + self.assertIsNone(parser) + self.assertEqual( + 'Submitting user user@test.com is not allowed to access test/v2', + error, + ) + + @patch('etl.load.main.get_accessor_config') + @patch('etl.load.main.prepare_parser_map') + def test_get_parser_no_matching_parser( + self, mock_prepare_parser_map, mock_get_accessor_config + ): + """Test get_parser_instance success""" + + mock_get_accessor_config.return_value = { + 'user@test.com': [ + { + 'name': 'a/b', + 'default_parameters': {'project': 'test'}, + } + ] + } + + mock_prepare_parser_map.return_value = { + 'c/d': TestGetParserInstance, + } + + parser, error = etl.load.main.get_parser_instance( + submitting_user='user@test.com', + request_type='a/b', + init_params={'project': 'to_override'}, + ) + self.assertIsNone(parser) + self.assertEqual( + 'Submitting user user@test.com could not find parser for a/b', + error, + ) diff --git a/metamist_infrastructure/driver.py b/metamist_infrastructure/driver.py index 8e69d2577..aa7627a15 100644 --- a/metamist_infrastructure/driver.py +++ b/metamist_infrastructure/driver.py @@ -1,3 +1,4 @@ +# pylint: disable=R0904 """ Make metamist architecture available to production pulumi stack so it can be centrally deployed. Do this through a plugin, and submodule. @@ -59,20 +60,22 @@ def main(self): @cached_property def _svc_cloudresourcemanager(self): + assert self.config.metamist return gcp.projects.Service( 'metamist-cloudresourcemanager-service', service='cloudresourcemanager.googleapis.com', disable_on_destroy=False, - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, ) @cached_property def _svc_iam(self): + assert self.config.metamist return gcp.projects.Service( 'metamist-iam-service', service='iam.googleapis.com', disable_on_destroy=False, - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, opts=pulumi.resource.ResourceOptions( depends_on=[self._svc_cloudresourcemanager] ), @@ -80,60 +83,80 @@ def _svc_iam(self): @cached_property def _svc_functions(self): + assert self.config.metamist return gcp.projects.Service( 'metamist-cloudfunctions-service', service='cloudfunctions.googleapis.com', - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, disable_on_destroy=False, ) @cached_property def _svc_pubsub(self): + assert self.config.metamist return gcp.projects.Service( 'metamist-pubsub-service', service='pubsub.googleapis.com', - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, disable_on_destroy=False, ) @cached_property def _svc_scheduler(self): + assert self.config.metamist return gcp.projects.Service( 'metamist-cloudscheduler-service', service='cloudscheduler.googleapis.com', - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, disable_on_destroy=False, ) @cached_property def _svc_build(self): + assert self.config.metamist return gcp.projects.Service( 'metamist-cloudbuild-service', service='cloudbuild.googleapis.com', - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, disable_on_destroy=False, ) @cached_property def _svc_bigquery(self): + assert self.config.metamist return gcp.projects.Service( 'metamist-bigquery-service', service='bigquery.googleapis.com', - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, disable_on_destroy=False, ) + @cached_property + def _svc_secretmanager(self): + assert self.config.metamist + return gcp.projects.Service( + 'metamist-secretmanager-service', + service='secretmanager.googleapis.com', + disable_on_destroy=False, + opts=pulumi.resource.ResourceOptions( + depends_on=[self._svc_cloudresourcemanager] + ), + project=self.config.metamist.gcp.project, + ) + @cached_property def source_bucket(self): """ We will store the source code to the Cloud Function in a Google Cloud Storage bucket. """ + assert self.config.gcp + assert self.config.metamist return gcp.storage.Bucket( 'metamist-source-bucket', name=f'{self.config.gcp.dataset_storage_prefix}metamist-source-bucket', location=self.config.gcp.region, - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, uniform_bucket_level_access=True, ) @@ -141,10 +164,11 @@ def _etl_function_account(self, f_name: str): """ Service account for cloud function """ + assert self.config.metamist return gcp.serviceaccount.Account( f'metamist-etl-{f_name}service-account', account_id=f'metamist-etl-{f_name}sa', - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, opts=pulumi.ResourceOptions( depends_on=[self._svc_iam], ), @@ -166,28 +190,101 @@ def etl_extract_service_account(self): return self._etl_function_account('extract-') @cached_property - def etl_accessors(self): + def etl_accessors(self) -> dict[str, gcp.serviceaccount.Account]: """Service account to run endpoint + ingestion as""" + assert self.config.metamist + assert self.config.metamist.etl + assert self.config.metamist.etl.accessors return { name: gcp.serviceaccount.Account( f'metamist-etl-accessor-{name}', account_id=f'metamist-etl-{name}', - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, opts=pulumi.ResourceOptions( depends_on=[self._svc_iam], ), ) - for name in self.config.sample_metadata.etl_accessors + # keys only + for name in self.config.metamist.etl.accessors } + @cached_property + def etl_configuration_secret(self): + """ + Get the secret for the etl-accessor-configuration + Nothing is secret, just an easy k-v store + """ + assert self.config.gcp + assert self.config.metamist + + return gcp.secretmanager.Secret( + 'metamist-etl-accessor-configuration-secret', + secret_id='accessor-configuration', + replication=gcp.secretmanager.SecretReplicationArgs( + user_managed=gcp.secretmanager.SecretReplicationUserManagedArgs( + replicas=[ + gcp.secretmanager.SecretReplicationUserManagedReplicaArgs( + location=self.config.gcp.region, + ), + ], + ), + ), + opts=pulumi.resource.ResourceOptions(depends_on=[self._svc_secretmanager]), + project=self.config.metamist.gcp.project, + ) + + @cached_property + def etl_configuration_secret_version(self): + """Get the versioned secret, that contains the latest configuration""" + assert self.config.metamist + assert self.config.metamist.etl + assert self.config.metamist.etl.accessors + + etl_accessor_config = { + k: v.to_dict() for k, v in self.config.metamist.etl.accessors.items() + } + + def map_accessors_to_new_body(arg): + accessors: dict[str, str] = dict(arg) + # dict[gcp.serviceaccount.Account: dict[str, ]] + remapped = {accessors[k]: v for k, v in etl_accessor_config.items()} + return json.dumps(remapped) + + etl_accessors_emails: dict[str, pulumi.Output[str]] = { + k: v.email for k, v in self.etl_accessors.items() + } + remapped_with_id = pulumi.Output.all(**etl_accessors_emails).apply( + map_accessors_to_new_body + ) + return gcp.secretmanager.SecretVersion( + 'metamist-etl-accessor-configuration', + secret=self.etl_configuration_secret.id, + secret_data=remapped_with_id, + ) + + def _setup_etl_configuration_secret_value(self): + # allow etl-runner to access secret + assert self.config.metamist + + gcp.secretmanager.SecretIamMember( + 'metamist-etl-accessor-configuration-access', + project=self.config.metamist.gcp.project, + secret_id=self.etl_configuration_secret.id, + role='role/secretmanager.secretAccessor', + member=pulumi.Output.concat( + 'serviceAccount:', self.etl_load_service_account.email + ), + ) + @cached_property def etl_pubsub_topic(self): """ Pubsub topic to trigger the etl function """ + assert self.config.metamist return gcp.pubsub.Topic( 'metamist-etl-topic', - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, opts=pulumi.ResourceOptions(depends_on=[self._svc_pubsub]), ) @@ -196,16 +293,17 @@ def etl_pubsub_dead_letters_topic(self): """ Pubsub dead_letters topic to capture failed jobs """ + assert self.config.metamist topic = gcp.pubsub.Topic( 'metamist-etl-dead-letters-topic', - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, opts=pulumi.ResourceOptions(depends_on=[self._svc_pubsub]), ) # give publisher permission to service account gcp.pubsub.TopicIAMPolicy( 'metamist-etl-dead-letters-topic-iam-policy', - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, topic=topic.name, policy_data=self.prepare_service_account_policy_data( 'roles/pubsub.publisher' @@ -220,6 +318,8 @@ def etl_pubsub_push_subscription(self): Pubsub push_subscription to topic, new messages to topic triggeres load process """ + assert self.config.metamist + subscription = gcp.pubsub.Subscription( 'metamist-etl-subscription', topic=self.etl_pubsub_topic.name, @@ -237,7 +337,7 @@ def etl_pubsub_push_subscription(self): 'x-goog-version': 'v1', }, ), - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, opts=pulumi.ResourceOptions( depends_on=[ self._svc_pubsub, @@ -249,7 +349,7 @@ def etl_pubsub_push_subscription(self): # give subscriber permission to service account gcp.pubsub.SubscriptionIAMPolicy( 'metamist-etl-pubsub-topic-subscription-policy', - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, subscription=subscription.name, policy_data=self.prepare_service_account_policy_data( 'roles/pubsub.subscriber' @@ -263,10 +363,12 @@ def etl_pubsub_dead_letter_subscription(self): """ Dead letter subscription """ + assert self.config.metamist + return gcp.pubsub.Subscription( 'metamist-etl-dead-letter-subscription', topic=self.etl_pubsub_dead_letters_topic.name, - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, ack_deadline_seconds=20, ) @@ -275,6 +377,8 @@ def etl_bigquery_dataset(self): """ Bigquery dataset to contain the bigquery table """ + assert self.config.gcp + assert self.config.metamist return gcp.bigquery.Dataset( 'metamist-etl-bigquery-dataset', dataset_id='metamist', @@ -285,7 +389,7 @@ def etl_bigquery_dataset(self): labels={ 'project': 'metamist', }, - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, opts=pulumi.ResourceOptions( depends_on=[self._svc_bigquery], ), @@ -293,6 +397,8 @@ def etl_bigquery_dataset(self): def _setup_bq_table(self, schema_file_name: Path, table_id: str, name_suffix: str): """Setup Bigquery table""" + assert self.config.metamist + with open(schema_file_name) as f: schema = f.read() @@ -302,7 +408,7 @@ def _setup_bq_table(self, schema_file_name: Path, table_id: str, name_suffix: st dataset_id=self.etl_bigquery_dataset.dataset_id, labels={'project': 'metamist'}, schema=schema, - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, # docs say: Note: On newer versions of the provider, # you must explicitly set deletion_protection=False, @@ -333,7 +439,6 @@ def prepare_service_account_policy_data(self, role): We need to give this account the ability to publish and read the topic """ - # get project project = gcp.organizations.get_project() return gcp.organizations.get_iam_policy( @@ -345,7 +450,7 @@ def prepare_service_account_policy_data(self, role): 'serviceAccount:service-', project.number, '@gcp-sa-pubsub.iam.gserviceaccount.com', - ) + ) # type: ignore ], ) ] @@ -355,17 +460,19 @@ def _setup_etl(self): """ setup_etl """ + assert self.config.metamist + # give the etl_load/extract service_accounts ability to read/write to bq table gcp.bigquery.DatasetAccess( 'metamist-etl-bq-dataset-extract-service-access', - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, dataset_id=self.etl_bigquery_dataset.dataset_id, role='WRITER', user_by_email=self.etl_extract_service_account.email, ) gcp.bigquery.DatasetAccess( 'metamist-etl-bq-dataset-load-service-access', - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, dataset_id=self.etl_bigquery_dataset.dataset_id, role='WRITER', user_by_email=self.etl_load_service_account.email, @@ -373,7 +480,7 @@ def _setup_etl(self): # give the etl_load_service_account ability to execute bigquery jobs gcp.projects.IAMMember( 'metamist-etl-bq-job-user-role', - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, role='roles/bigquery.jobUser', member=pulumi.Output.concat( 'serviceAccount:', self.etl_load_service_account.email @@ -382,7 +489,7 @@ def _setup_etl(self): # give the etl_extract_service_account ability to push to pub/sub gcp.projects.IAMMember( 'metamist-etl-extract-editor-role', - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, role='roles/editor', member=pulumi.Output.concat( 'serviceAccount:', self.etl_extract_service_account.email @@ -391,7 +498,7 @@ def _setup_etl(self): # give the etl_load_service_account ability to push to pub/sub gcp.projects.IAMMember( 'metamist-etl-load-editor-role', - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, role='roles/editor', member=pulumi.Output.concat( 'serviceAccount:', self.etl_load_service_account.email @@ -409,7 +516,7 @@ def _setup_etl(self): ) gcp.projects.IAMMember( 'metamist-etl-robot-service-agent-role', - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, role='roles/run.serviceAgent', member=robot_account, ) @@ -418,6 +525,7 @@ def _setup_etl(self): self._setup_etl_pubsub() self._setup_metamist_etl_accessors() + self._setup_etl_configuration_secret_value() def _setup_etl_functions(self): """ @@ -500,6 +608,9 @@ def _etl_function( """ Driver function to setup the etl cloud function """ + assert self.config.gcp + assert self.config.metamist + assert self.config.metamist.etl path_to_func_folder = ETL_FOLDER / f_name @@ -512,7 +623,7 @@ def _etl_function( 'requirements.txt': append_private_repositories_to_requirements( filename=f'{str(path_to_func_folder.absolute())}/requirements.txt', private_repo_url=private_repo_url, - private_repos=self.config.sample_metadata.etl_private_repo_packages, + private_repos=self.config.metamist.etl.private_repo_packages, ), } archive = archive_folder( @@ -542,6 +653,9 @@ def _etl_function( runtime='python311', entry_point=f'etl_{f_name}', environment_variables={}, + # this one is set on an output, so specifying it keeps the function + # from being updated, or appearing to update + docker_repository=f'projects/{self.config.metamist.gcp.project}/locations/australia-southeast1/repositories/gcf-artifacts', source=gcp.cloudfunctionsv2.FunctionBuildConfigSourceArgs( storage_source=gcp.cloudfunctionsv2.FunctionBuildConfigSourceStorageSourceArgs( bucket=self.source_bucket.name, @@ -553,7 +667,7 @@ def _etl_function( max_instance_count=1, # Keep max instances to 1 to avoid racing conditions min_instance_count=0, available_memory='2Gi', - available_cpu=1, + available_cpu='1', timeout_seconds=540, environment_variables={ # format: 'project.dataset.table_id @@ -572,19 +686,19 @@ def _etl_function( self.etl_bigquery_log_table.table_id, ), 'PUBSUB_TOPIC': self.etl_pubsub_topic.id, - 'NOTIFICATION_PUBSUB_TOPIC': self.etl_slack_notification_topic.id - if self.etl_slack_notification_topic - else '', - 'SM_ENVIRONMENT': self.config.sample_metadata.etl_environment, - 'DEFAULT_LOAD_CONFIG': json.dumps( - self.config.sample_metadata.etl_parser_default_config + 'NOTIFICATION_PUBSUB_TOPIC': ( + self.etl_slack_notification_topic.id + if self.etl_slack_notification_topic + else '' ), - }, + 'SM_ENVIRONMENT': self.config.metamist.etl.environment, + 'CONFIGURATION_SECRET': self.etl_configuration_secret_version.id, + }, # type: ignore ingress_settings='ALLOW_ALL', all_traffic_on_latest_revision=True, service_account_email=sa.email, ), - project=self.config.sample_metadata.gcp.project, + project=self.config.metamist.gcp.project, location=self.config.gcp.region, opts=pulumi.ResourceOptions( depends_on=[ @@ -622,15 +736,20 @@ def etl_slack_notification(self): """ Setup Slack notification """ + assert self.config.gcp + assert self.config.metamist + assert self.config.metamist.slack_channel + assert self.config.billing + assert self.config.billing.aggregator slack_config = SlackNotificationConfig( - project_name=self.config.sample_metadata.gcp.project, + project_name=self.config.metamist.gcp.project, location=self.config.gcp.region, service_account=self.etl_service_account, # can be some other account source_bucket=self.source_bucket, slack_secret_project_id=self.config.billing.gcp.project_id, slack_token_secret_name=self.config.billing.aggregator.slack_token_secret_name, - slack_channel_name=self.config.sample_metadata.slack_channel, + slack_channel_name=self.config.metamist.slack_channel, ) notification = SlackNotification( diff --git a/metamist_infrastructure/setup.py b/metamist_infrastructure/setup.py index a4518674b..f51544b3c 100644 --- a/metamist_infrastructure/setup.py +++ b/metamist_infrastructure/setup.py @@ -15,7 +15,7 @@ setup( name=PKG, # not super important as we don't deploy this plugin - version='1.0.1', + version='1.1.0', description='Metamist infrastructure plugin for cpg-infrastructure', long_description=readme, long_description_content_type='text/markdown', diff --git a/metamist_infrastructure/slack_notification.py b/metamist_infrastructure/slack_notification.py index 89a9ab1ec..b6843e014 100644 --- a/metamist_infrastructure/slack_notification.py +++ b/metamist_infrastructure/slack_notification.py @@ -81,7 +81,7 @@ def __init__( self, slack_config: SlackNotificationConfig, topic_name: str, # e.g. 'metamist-etl-notification' - func_to_monitor: list | None, + func_to_monitor: list[str], notification_type: SlackNotificationType, depends_on: list | None, ): @@ -176,6 +176,9 @@ def notification_cloudfun(self): runtime='python311', entry_point='etl_notify', environment_variables={}, + # this one is set on an output, so specifying it keeps the function + # from being updated, or appearing to update + docker_repository=f'projects/{self.config.project_name}/locations/australia-southeast1/repositories/gcf-artifacts', source=gcp.cloudfunctionsv2.FunctionBuildConfigSourceArgs( storage_source=gcp.cloudfunctionsv2.FunctionBuildConfigSourceStorageSourceArgs( bucket=self.config.source_bucket.name, @@ -187,7 +190,7 @@ def notification_cloudfun(self): max_instance_count=1, # Keep max instances to 1 to avoid racing conditions min_instance_count=0, available_memory='2Gi', - available_cpu=1, + available_cpu='1', timeout_seconds=540, environment_variables={ 'SLACK_BOT_TOKEN': read_secret( @@ -197,7 +200,7 @@ def notification_cloudfun(self): fail_gracefully=False, ), 'SLACK_CHANNEL': self.config.slack_channel_name, - }, + }, # type: ignore ingress_settings='ALLOW_ALL', all_traffic_on_latest_revision=True, service_account_email=self.config.service_account.email, From 81644b06fb2256fd0b6ffa4e297fcad5a73c3c55 Mon Sep 17 00:00:00 2001 From: Vivian Bakiris <79084890+vivbak@users.noreply.github.com> Date: Tue, 13 Feb 2024 09:35:02 +1100 Subject: [PATCH 2/5] Fix Existing Cohort Parser Sequencing Type Param (#679) * Fix click overwriting param to None when not specified * Fix whitespace after : for the linter --- scripts/parse_existing_cohort.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scripts/parse_existing_cohort.py b/scripts/parse_existing_cohort.py index 2603a8d60..1df35063c 100644 --- a/scripts/parse_existing_cohort.py +++ b/scripts/parse_existing_cohort.py @@ -216,6 +216,7 @@ def get_existing_external_sequence_ids(self, participant_map: dict[str, dict]): '--sequencing-type', type=click.Choice(['genome', 'exome']), help='Sequencing type: genome or exome', + default='genome', ) @click.option('--search-location', 'search_locations', multiple=True) @click.option( @@ -239,11 +240,11 @@ async def main( project: str, search_locations: List[str], batch_number: Optional[str], + sequencing_type: str, confirm=True, dry_run=False, include_participant_column=False, allow_missing_files=False, - sequencing_type: str = 'genome', ): """Run script from CLI arguments""" From 08795fa319296fe69f7f502156fd4e0924f6332e Mon Sep 17 00:00:00 2001 From: Michael Franklin <22381693+illusional@users.noreply.github.com> Date: Wed, 14 Feb 2024 16:28:02 +1100 Subject: [PATCH 3/5] Fix role description (#682) * Fix role description * Fix import spacing --------- Co-authored-by: Michael Franklin --- metamist_infrastructure/driver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metamist_infrastructure/driver.py b/metamist_infrastructure/driver.py index aa7627a15..2a12e0c3e 100644 --- a/metamist_infrastructure/driver.py +++ b/metamist_infrastructure/driver.py @@ -270,7 +270,7 @@ def _setup_etl_configuration_secret_value(self): 'metamist-etl-accessor-configuration-access', project=self.config.metamist.gcp.project, secret_id=self.etl_configuration_secret.id, - role='role/secretmanager.secretAccessor', + role='roles/secretmanager.secretAccessor', member=pulumi.Output.concat( 'serviceAccount:', self.etl_load_service_account.email ), From 9199dab65cb1d3ad1e0d4c731ba89ef6a770b637 Mon Sep 17 00:00:00 2001 From: EddieLF <34049565+EddieLF@users.noreply.github.com> Date: Fri, 16 Feb 2024 16:11:18 +1100 Subject: [PATCH 4/5] Rna ingestion updates (#676) * Updating parsers with library type etc * Add new sample map column defaults, update sg meta on RNA ingest * Update parser column default, update rna sg meta, update parsing prints * Change reference to 'sequence' to 'assay' * Change reference to 'library_type' to 'sequencing_library' * Change another reference to 'library_type' to 'sequencing_library' * Add required fields for exomes, more tidying, use tabulate * Add tabulate to requirements * Fix parse_manifest return for tests, update script referencing group_assays fn * Remove required exome fields, re-add default analysis status 'completed' * Update parser testing with RNA examples * Remove added external_family_id from ParsedParticipant class * Refactor parser to use SequencingDefualts class * Collapse sequencing defaults into new class * Update setup.py and resolve merge conflicts * isort linting * Fix typehint for read and ref files function return --- metamist/parser/generic_metadata_parser.py | 209 ++++++++++++++------- metamist/parser/generic_parser.py | 191 ++++++++++++++----- metamist/parser/sample_file_map_parser.py | 103 +++++++--- scripts/parse_existing_cohort.py | 6 +- scripts/parse_ont_sheet.py | 38 ++-- scripts/parse_sample_file_map.py | 18 +- scripts/parse_vcgs_manifest.py | 6 +- setup.py | 1 + test/test_parse_existing_cohort.py | 4 +- test/test_parse_file_map.py | 172 ++++++++++++++++- 10 files changed, 569 insertions(+), 179 deletions(-) diff --git a/metamist/parser/generic_metadata_parser.py b/metamist/parser/generic_metadata_parser.py index d89104df0..3c9758770 100644 --- a/metamist/parser/generic_metadata_parser.py +++ b/metamist/parser/generic_metadata_parser.py @@ -4,15 +4,17 @@ import re import shlex from functools import reduce -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Set, Tuple, Union import click from metamist.parser.generic_parser import ( # noqa + DefaultSequencing, GenericParser, GroupedRow, ParsedAnalysis, ParsedAssay, + ParsedSample, ParsedSequencingGroup, SingleRow, run_as_sync, @@ -92,16 +94,22 @@ def __init__( seq_type_column: Optional[str] = None, seq_technology_column: Optional[str] = None, seq_platform_column: Optional[str] = None, + seq_facility_column: Optional[str] = None, + seq_library_column: Optional[str] = None, + read_end_type_column: Optional[str] = None, + read_length_column: Optional[str] = None, gvcf_column: Optional[str] = None, meta_column: Optional[str] = None, seq_meta_column: Optional[str] = None, batch_number: Optional[str] = None, reference_assembly_location_column: Optional[str] = None, default_reference_assembly_location: Optional[str] = None, - default_sequencing_type='genome', default_sample_type=None, - default_sequencing_technology='short-read', - default_sequencing_platform='illumina', + default_sequencing=DefaultSequencing( + seq_type='genome', technology='short-read', platform='illumina' + ), + default_read_end_type: Optional[str] = None, + default_read_length: Optional[str | int] = None, allow_extra_files_in_search_path=False, **kwargs, ): @@ -109,10 +117,10 @@ def __init__( path_prefix=None, search_paths=search_locations, project=project, - default_sequencing_type=default_sequencing_type, default_sample_type=default_sample_type, - default_sequencing_technology=default_sequencing_technology, - default_sequencing_platform=default_sequencing_platform, + default_sequencing=default_sequencing, + default_read_end_type=default_read_end_type, + default_read_length=default_read_length, **kwargs, ) @@ -130,6 +138,10 @@ def __init__( self.seq_type_column = seq_type_column self.seq_technology_column = seq_technology_column self.seq_platform_column = seq_platform_column + self.seq_facility_column = seq_facility_column + self.seq_library_column = seq_library_column + self.read_end_type_column = read_end_type_column + self.read_length_column = read_length_column self.reference_assembly_location_column = reference_assembly_location_column self.default_reference_assembly_location = default_reference_assembly_location @@ -168,14 +180,14 @@ def get_sequencing_types(self, row: GroupedRow) -> list[str]: if isinstance(row, dict): return [self.get_sequencing_type(row)] return [ - str(r.get(self.seq_type_column, self.default_sequencing_type)) for r in row + str(r.get(self.seq_type_column, self.default_sequencing.seq_type)) for r in row ] def get_sequencing_technology(self, row: SingleRow) -> str: """Get assay technology for single row""" value = ( row.get(self.seq_technology_column, None) - or self.default_sequencing_technology + or self.default_sequencing.technology ) value = value.lower() @@ -187,7 +199,7 @@ def get_sequencing_technology(self, row: SingleRow) -> str: def get_sequencing_platform(self, row: SingleRow) -> str: """Get sequencing platform for single row""" value = ( - row.get(self.seq_platform_column, None) or self.default_sequencing_platform + row.get(self.seq_platform_column, None) or self.default_sequencing.platform ) value = value.lower() @@ -195,7 +207,7 @@ def get_sequencing_platform(self, row: SingleRow) -> str: def get_sequencing_type(self, row: SingleRow) -> str: """Get assay type from row""" - value = row.get(self.seq_type_column, None) or self.default_sequencing_type + value = row.get(self.seq_type_column, None) or self.default_sequencing.seq_type value = value.lower() if value == 'wgs': @@ -204,9 +216,47 @@ def get_sequencing_type(self, row: SingleRow) -> str: value = 'exome' elif 'mt' in value: value = 'mtseq' + elif 'polya' in value or 'mrna' in value: + value = 'polyarna' + elif 'total' in value: + value = 'totalrna' + elif 'single' in value: + value = 'singlecellrna' return str(value) + def get_sequencing_facility(self, row: SingleRow) -> str: + """Get sequencing facility from row""" + value = ( + row.get(self.seq_facility_column, None) or self.default_sequencing.facility + ) + value = str(value) if value else None + return value + + def get_sequencing_library(self, row: SingleRow) -> str: + """Get sequencing library from row""" + value = ( + row.get(self.seq_library_column, None) or self.default_sequencing.library + ) + value = str(value) if value else None + return value + + def get_read_end_type(self, row: SingleRow) -> str: + """Get read end type from row""" + value = ( + row.get(self.read_end_type_column, None) or self.default_read_end_type + ) + value = str(value).lower() if value else None + return value + + def get_read_length(self, row: SingleRow) -> int: + """Get read length from row""" + value = ( + row.get(self.read_length_column, None) or self.default_read_length + ) + value = int(value) if value else None + return value + def get_assay_id(self, row: GroupedRow) -> Optional[dict[str, str]]: """Get external assay ID from row. Needs to be implemented per parser. NOTE: To be re-thought after assay group changes are applied""" @@ -515,6 +565,7 @@ async def get_participant_meta_from_group(self, rows: GroupedRow): async def get_sequencing_group_meta( self, sequencing_group: ParsedSequencingGroup ) -> dict: + """Get sequencing group metadata from variant (vcf) files""" meta: dict[str, Any] = {} if not sequencing_group.sample.external_sid: @@ -552,6 +603,55 @@ async def get_sequencing_group_meta( return meta + async def get_read_and_ref_files_and_checksums(self, sample_id: str, rows: GroupedRow) -> ( + Tuple[List[str], List[str], Set[str]]): + """Get read filenames and checksums from rows.""" + read_filenames: List[str] = [] + read_checksums: List[str] = [] + reference_assemblies: set[str] = set() + for r in rows: + _rfilenames = await self.get_read_filenames(sample_id=sample_id, row=r) + read_filenames.extend(_rfilenames) + if self.checksum_column and self.checksum_column in r: + checksums = await self.get_checksums_from_row(sample_id, r, _rfilenames) + if not checksums: + checksums = [None] * len(_rfilenames) + read_checksums.extend(checksums) + if self.reference_assembly_location_column: + ref = r.get(self.reference_assembly_location_column) + if ref: + reference_assemblies.add(ref) + return read_filenames, read_checksums, reference_assemblies + + async def parse_cram_assays(self, sample: ParsedSample, reference_assemblies: set[str]) -> dict[str, Any]: + """Parse CRAM assays""" + if len(reference_assemblies) > 1: + # sorted for consistent testing + str_ref_assemblies = ', '.join(sorted(reference_assemblies)) + raise ValueError( + f'Multiple reference assemblies were defined for {sample.external_sid}: {str_ref_assemblies}' + ) + if len(reference_assemblies) == 1: + ref = next(iter(reference_assemblies)) + else: + ref = self.default_reference_assembly_location + if not ref: + raise ValueError( + f'Reads type for {sample.external_sid!r} is CRAM, but a reference ' + f'is not defined, please set the default reference assembly path' + ) + + ref_fp = self.file_path(ref) + secondary_files = ( + await self.create_secondary_file_objects_by_potential_pattern( + ref_fp, ['.fai'] + ) + ) + cram_reference = await self.create_file_object( + ref_fp, secondary_files=secondary_files + ) + return {'reference_assembly' : cram_reference} + async def get_assays_from_group( self, sequencing_group: ParsedSequencingGroup ) -> list[ParsedAssay]: @@ -566,27 +666,9 @@ async def get_assays_from_group( assays = [] - read_filenames: List[str] = [] - read_checksums: List[str] = [] - reference_assemblies: set[str] = set() - - for r in rows: - _rfilenames = await self.get_read_filenames( - sample_id=sample.external_sid, row=r - ) - read_filenames.extend(_rfilenames) - if self.checksum_column and self.checksum_column in r: - checksums = await self.get_checksums_from_row( - sample.external_sid, r, _rfilenames - ) - if not checksums: - checksums = [None] * len(_rfilenames) - read_checksums.extend(checksums) - - if self.reference_assembly_location_column: - ref = r.get(self.reference_assembly_location_column) - if ref: - reference_assemblies.add(ref) + read_filenames, read_checksums, reference_assemblies = await self.get_read_and_ref_files_and_checksums( + sample.external_sid, rows + ) # strip in case collaborator put "file1, file2" full_read_filenames: List[str] = [] @@ -614,37 +696,32 @@ async def get_assays_from_group( # collapsed_assay_meta['reads'] = reads[reads_type] if reads_type == 'cram': - if len(reference_assemblies) > 1: - # sorted for consistent testing - str_ref_assemblies = ', '.join(sorted(reference_assemblies)) - raise ValueError( - f'Multiple reference assemblies were defined for {sample.external_sid}: {str_ref_assemblies}' - ) - if len(reference_assemblies) == 1: - ref = next(iter(reference_assemblies)) - else: - ref = self.default_reference_assembly_location - - if not ref: - raise ValueError( - f'Reads type for {sample.external_sid!r} is CRAM, but a reference ' - f'is not defined, please set the default reference assembly path' - ) - - ref_fp = self.file_path(ref) - secondary_files = ( - await self.create_secondary_file_objects_by_potential_pattern( - ref_fp, ['.fai'] - ) + collapsed_assay_meta.update( + await self.parse_cram_assays(sample, reference_assemblies) ) - cram_reference = await self.create_file_object( - ref_fp, secondary_files=secondary_files - ) - collapsed_assay_meta['reference_assembly'] = cram_reference if self.batch_number is not None: collapsed_assay_meta['batch'] = self.batch_number + if sequencing_group.sequencing_type in ['exome', 'polyarna', 'totalrna', 'singlecellrna']: + rows = sequencing_group.rows + # Exome / RNA should have facility and library, allow missing for exomes - for now + if self.get_sequencing_facility(rows[0]): + collapsed_assay_meta['sequencing_facility'] = self.get_sequencing_facility(rows[0]) + if self.get_sequencing_library(rows[0]): + collapsed_assay_meta['sequencing_library'] = self.get_sequencing_library(rows[0]) + # RNA requires read end type and length as well + if sequencing_group.sequencing_type != 'exome': + collapsed_assay_meta['read_end_type'] = self.get_read_end_type(rows[0]) + collapsed_assay_meta['read_length'] = self.get_read_length(rows[0]) + # if any of the above fields are not set for an RNA assay, raise an error + if not all(collapsed_assay_meta.values()): + raise ValueError( + f'Not all required fields were set for RNA sample {sample.external_sid}:\n' + f'{collapsed_assay_meta}\n' + f'Use the default value arguments if they are not present in the manifest.' + ) + for read in reads[reads_type]: assays.append( ParsedAssay( @@ -653,8 +730,8 @@ async def get_assays_from_group( # as we grab all reads, and then determine assay # grouping from there. rows=sequencing_group.rows, - internal_seq_id=None, - external_seq_ids={}, + internal_assay_id=None, + external_assay_ids={}, # unfortunately hard to break them up by row in the current format # assay_status=self.get_assay_status(rows), assay_type='sequencing', @@ -667,6 +744,8 @@ async def get_assays_from_group( }, ) ) + if not sequencing_group.meta: + sequencing_group.meta = self.get_sequencing_group_meta_from_assays(assays) return assays @@ -803,9 +882,9 @@ async def main( if not manifests: raise ValueError('Expected at least 1 manifest') - extra_seach_paths = [m for m in manifests if m.startswith('gs://')] - if extra_seach_paths: - search_path = list(set(search_path).union(set(extra_seach_paths))) + extra_search_paths = [m for m in manifests if m.startswith('gs://')] + if extra_search_paths: + search_path = list(set(search_path).union(set(extra_search_paths))) participant_meta_map: Dict[Any, Any] = {} sample_meta_map: Dict[Any, Any] = {} @@ -839,7 +918,9 @@ async def main( reported_gender_column=reported_gender_column, karyotype_column=karyotype_column, default_sample_type=default_sample_type, - default_sequencing_type=default_assay_type, + default_sequencing=DefaultSequencing( + seq_type='genome', technology='short-read', platform='illumina' + ), search_locations=search_path, ) for manifest in manifests: diff --git a/metamist/parser/generic_parser.py b/metamist/parser/generic_parser.py index 9bee4fe70..8dc2de5d7 100644 --- a/metamist/parser/generic_parser.py +++ b/metamist/parser/generic_parser.py @@ -28,6 +28,7 @@ ) from cloudpathlib import AnyPath +from tabulate import tabulate from metamist.apis import AnalysisApi, AssayApi, ParticipantApi, SampleApi from metamist.graphql import gql, query_async @@ -65,6 +66,7 @@ + GVCF_EXTENSIONS + VCF_EXTENSIONS ) +RNA_SEQ_TYPES = ['polyarna', 'totalrna', 'singlecellrna'] # construct rmatch string to capture all fastq patterns rmatch_str = ( @@ -259,7 +261,7 @@ def to_sm(self) -> SampleUpsert: class ParsedSequencingGroup: - """Class for holding sequence metadata grouped by type""" + """Class for holding sequencing group metadata""" def __init__( self, @@ -268,8 +270,8 @@ def __init__( internal_seqgroup_id: int | None, external_seqgroup_id: str | None, sequencing_type: str, - sequence_technology: str, - sequence_platform: str | None, + sequencing_technology: str, + sequencing_platform: str | None, meta: dict[str, Any] | None, ): self.sample = sample @@ -278,8 +280,8 @@ def __init__( self.internal_seqgroup_id = internal_seqgroup_id self.external_seqgroup_id = external_seqgroup_id self.sequencing_type = sequencing_type - self.sequencing_technology = sequence_technology - self.sequencing_platform = sequence_platform + self.sequencing_technology = sequencing_technology + self.sequencing_platform = sequencing_platform self.meta = meta self.assays: list[ParsedAssay] = [] @@ -292,7 +294,7 @@ def to_sm(self) -> SequencingGroupUpsert: technology=self.sequencing_technology, platform=self.sequencing_platform, meta=self.meta, - assays=[sq.to_sm() for sq in self.assays or []], + assays=[a.to_sm() for a in self.assays or []], id=self.internal_seqgroup_id, ) @@ -304,16 +306,16 @@ def __init__( self, group: ParsedSequencingGroup, rows: GroupedRow, - internal_seq_id: int | None, - external_seq_ids: dict[str, str], + internal_assay_id: int | None, + external_assay_ids: dict[str, str], assay_type: str | None, meta: dict[str, Any] | None, ): self.sequencing_group = group self.rows = rows - self.internal_id = internal_seq_id - self.external_ids = external_seq_ids + self.internal_id = internal_assay_id + self.external_ids = external_assay_ids self.assay_type = assay_type self.meta = meta @@ -350,7 +352,7 @@ def __init__( def to_sm(self): """To SM model""" if not self.sequencing_group.internal_seqgroup_id: - raise ValueError('Sequence group ID must be filled in by now') + raise ValueError('Sequencing group ID must be filled in by now') return Analysis( status=AnalysisStatus(self.status), type=str(self.type), @@ -360,9 +362,26 @@ def to_sm(self): ) +class DefaultSequencing: + """Groups default sequencing information""" + def __init__( + self, + seq_type: str = 'genome', # seq_type because `type` is a built-in + technology: str = 'short-read', + platform: str = 'illumina', + facility: str = None, + library: str = None, + ): + self.seq_type = seq_type + self.technology = technology + self.platform = platform + self.facility = facility + self.library = library + + def chunk(iterable: Iterable[T], chunk_size=50) -> Iterator[List[T]]: """ - Chunk a sequence by yielding lists of `chunk_size` + Chunk an iterable by yielding lists of `chunk_size` """ chnk: List[T] = [] for element in iterable: @@ -398,23 +417,23 @@ def wrapper(*args, **kwargs): class GenericParser( CloudHelper ): # pylint: disable=too-many-public-methods,too-many-arguments - """Parser for VCGS manifest""" + """Parser for ingesting rows of metadata""" def __init__( # pylint: disable=too-many-arguments self, path_prefix: Optional[str], search_paths: list[str], project: str, - default_sequencing_type='genome', - default_sequencing_technology='short-read', - default_sequencing_platform: str | None = None, - default_sample_type=None, - default_analysis_type='qc', - default_analysis_status='completed', - skip_checking_gcs_objects=False, + default_sample_type: str = None, + default_sequencing: DefaultSequencing = DefaultSequencing(), + default_read_end_type: str = None, + default_read_length: str | int = None, + default_analysis_type: str = None, + default_analysis_status: str = 'completed', key_map: Dict[str, str] = None, - ignore_extra_keys=False, required_keys: Set[str] = None, + ignore_extra_keys=False, + skip_checking_gcs_objects=False, verbose=True, ): self.path_prefix = path_prefix @@ -430,12 +449,12 @@ def __init__( # pylint: disable=too-many-arguments self.project = project - self.default_sequencing_type: str = default_sequencing_type - self.default_sequencing_technology: str = default_sequencing_technology - self.default_sequencing_platform: Optional[str] = default_sequencing_platform + self.default_sequencing = default_sequencing + self.default_read_end_type: Optional[str] = default_read_end_type + self.default_read_length: Optional[str] = default_read_length self.default_sample_type: Optional[str] = default_sample_type - self.default_analysis_type: str = default_analysis_type - self.default_analysis_status: str = default_analysis_status + self.default_analysis_type: Optional[str] = default_analysis_type + self.default_analysis_status: Optional[str] = default_analysis_status # gs specific self.default_bucket = None @@ -500,8 +519,7 @@ async def parse_manifest( # pylint: disable=too-many-branches ) -> Any: """ Parse manifest from iterable (file pointer / String.IO) - - Returns a dict mapping external sample ID to CPG sample ID + Returns a summary of the parsed records. """ rows = await self.file_pointer_to_rows( file_pointer=file_pointer, delimiter=delimiter @@ -509,7 +527,13 @@ async def parse_manifest( # pylint: disable=too-many-branches return await self.from_json(rows, confirm, dry_run) async def from_json(self, rows, confirm=False, dry_run=False): - """Parse passed rows""" + """ + Asynchronously parse rows of data, adding chunks of participants, samples, sequencing groups, assays, and analyses. + + Groups rows of participants by their IDs. For each participant, group samples by their IDs. + If no participants are present, groups samples by their IDs. + For each sample, gets its sequencing groups by their keys. For each sequencing group, groups assays and analyses. + """ await self.validate_rows(rows) # one participant with no value @@ -536,7 +560,9 @@ async def from_json(self, rows, confirm=False, dry_run=False): sequencing_groups: list[ParsedSequencingGroup] = [] for schunk in chunk(samples): - seq_groups_for_chunk = await asyncio.gather(*map(self.group_assays, schunk)) + seq_groups_for_chunk = await asyncio.gather( + *map(self.get_sample_sequencing_groups, schunk) + ) for sample, seqgroups in zip(schunk, seq_groups_for_chunk): sample.sequencing_groups = seqgroups @@ -581,6 +607,7 @@ async def from_json(self, rows, confirm=False, dry_run=False): if dry_run: logger.info('Dry run, so returning without inserting / updating metadata') + self.prepare_detail(samples) return summary, (participants if participants else samples) if confirm: @@ -601,7 +628,10 @@ async def from_json(self, rows, confirm=False, dry_run=False): [s.to_sm() for s in samples], ) - print(json.dumps(result, indent=2)) + if self.verbose: + logger.info(json.dumps(result, indent=2)) + else: + self.prepare_detail(samples) def _get_dict_reader(self, file_pointer, delimiter: str): """ @@ -662,6 +692,29 @@ def prepare_summary( return summary + def prepare_detail(self, samples: list[ParsedSample]): + """Uses tabulate to print a detailed summary of the samples being inserted / updated""" + sample_participants = {} + for sample in samples: + sample_participants[sample.external_sid] = sample.participant.external_pid if sample.participant else None + sample_sequencing_groups = {sample.external_sid: sample.sequencing_groups for sample in samples} + + details = [] + for sample, participant in sample_participants.items(): + for sg in sample_sequencing_groups[sample]: + sg_details = { + 'Participant': participant if participant else '', + 'Sample': sample, + 'Sequencing Type': sg.sequencing_type, + 'Assays': sum(1 for a in sg.assays if not a.internal_id), + } + details.append(sg_details) + + headers = ['Participant', 'Sample', 'Sequencing Type', 'Assays'] + table = list(list(detail.values()) for detail in details) + + print(tabulate(table, headers=headers, tablefmt='grid')) + def prepare_message( self, summary, @@ -681,32 +734,38 @@ def prepare_message( header = f'Processing samples: {external_sample_ids}' assays_count: dict[str, int] = defaultdict(int) + assays_types_count: dict[str, int] = defaultdict(int) sequencing_group_counts: dict[str, int] = defaultdict(int) - for s in assays: - assays_count[str(s.assay_type)] += 1 + for a in assays: + assays_count[str(a.meta.get('sequencing_type'))] += 1 + assays_types_count[str(a.assay_type)] += 1 for sg in sequencing_groups: sequencing_group_counts[str(sg.sequencing_type)] += 1 - str_seq_count = ', '.join(f'{k}={v}' for k, v in assays_count.items()) + str_assay_count = ', '.join(f'{k}={v}' for k, v in assays_count.items()) + str_assay_types_count = ', '.join(f'{k}={v}' for k, v in assays_types_count.items()) str_seqg_count = ', '.join( f'{k}={v}' for k, v in sequencing_group_counts.items() ) message = f"""\ + + {self.project}: {header} - Sequence types: {str_seq_count} - Sequence group types: {str_seqg_count} + Assays count: {str_assay_count} + Assays types count: {str_assay_types_count} + Sequencing group count: {str_seqg_count} Adding {summary['participants']['insert']} participants Adding {summary['samples']['insert']} samples - Adding {summary['sequencing_groups']['insert']} sequence groups + Adding {summary['sequencing_groups']['insert']} sequencing groups Adding {summary['assays']['insert']} assays - Adding {summary['analyses']['insert']} analysis + Adding {summary['analyses']['insert']} analyses Updating {summary['participants']['update']} participants Updating {summary['samples']['update']} samples - Updating {summary['sequencing_groups']['update']} sequence groups + Updating {summary['sequencing_groups']['update']} sequencing groups Updating {summary['assays']['update']} assays """ return message @@ -905,7 +964,7 @@ async def group_samples( ) -> list[ParsedSample]: """ From a set of rows, group (by calling self.get_sample_id) - and parse sample other sample values. + and parse samples and their values. """ samples = [] for sid, sample_rows in group_by(rows, self.get_sample_id).items(): @@ -928,7 +987,7 @@ async def get_sample_meta_from_group(self, rows: GroupedRow) -> dict: def get_sequencing_group_key(self, row: SingleRow) -> Hashable: """ - Get a key to group sequencing rows by. + Get a key to group sequencing group rows by. """ if seq_group_id := self.get_sequencing_group_id(row): return seq_group_id @@ -945,10 +1004,10 @@ def get_sequencing_group_key(self, row: SingleRow) -> Hashable: return tuple(v for _, v in keys) - async def group_assays(self, sample: ParsedSample) -> list[ParsedSequencingGroup]: + async def get_sample_sequencing_groups(self, sample: ParsedSample) -> list[ParsedSequencingGroup]: """ - From a set of rows, group (by calling self.get_sequencing_group_key) - and parse sequencing group other sequencing group values. + From a set of samples, group (by calling self.get_sequencing_group_key) + and parse sequencing groups and their values. """ sequencing_groups = [] for seq_rows in group_by(sample.rows, self.get_sequencing_group_key).values(): @@ -960,8 +1019,8 @@ async def group_assays(self, sample: ParsedSample) -> list[ParsedSequencingGroup internal_seqgroup_id=None, external_seqgroup_id=self.get_sequencing_group_id(seq_rows[0]), sequencing_type=seq_type, - sequence_technology=seq_tech, - sequence_platform=seq_platform, + sequencing_technology=seq_tech, + sequencing_platform=seq_platform, meta={}, sample=sample, rows=seq_rows, @@ -998,6 +1057,23 @@ async def get_assays_from_group( return list[ParsedAssay] (does not have to equal number of rows). """ + def get_sequencing_group_meta_from_assays(self, assays: list[ParsedAssay]) -> dict: + """ + From a list of assays, get any relevant sequencing group meta + """ + meta = {} + for assay in assays: + if assay.meta.get('sequencing_type') == 'exome': + keys = ('sequencing_facility', 'sequencing_library') + elif assay.meta.get('sequencing_type') in RNA_SEQ_TYPES: + keys = ('sequencing_facility', 'sequencing_library', 'read_end_type', 'read_length') + else: + continue + for key in keys: + if assay.meta.get(key) is not None: + meta[key] = assay.meta[key] + return meta + def get_sample_type(self, row: GroupedRow) -> str: """Get sample type from row""" return self.default_sample_type @@ -1014,15 +1090,31 @@ def get_sequencing_group_id(self, row: SingleRow) -> str | None: def get_sequencing_type(self, row: SingleRow) -> str: """Get sequence types from row""" - return self.default_sequencing_type + return self.default_sequencing.seq_type def get_sequencing_technology(self, row: SingleRow) -> str: """Get sequencing technology from row""" - return self.default_sequencing_technology + return self.default_sequencing.technology def get_sequencing_platform(self, row: SingleRow) -> str | None: """Get sequencing platform from row""" - return self.default_sequencing_platform + return self.default_sequencing.platform + + def get_sequencing_facility(self, row: SingleRow) -> str | None: + """Get sequencing facility from row""" + return self.default_sequencing.facility + + def get_sequencing_library(self, row: SingleRow) -> str | None: + """Get library type from row""" + return self.default_sequencing.library + + def get_read_end_type(self, row: SingleRow) -> str | None: + """Get read end type from row""" + return self.default_read_end_type + + def get_read_length(self, row: SingleRow) -> str | None: + """Get read length from row""" + return self.default_read_length def get_analysis_type(self, sample_id: str, row: GroupedRow) -> str: """Get analysis type from row""" @@ -1293,6 +1385,7 @@ def parse_fastqs_structure(fastqs) -> List[List[str]]: invalid_fastq_groups = [grp for grp in fastq_groups.values() if len(grp) != 2] if invalid_fastq_groups: + # TODO: implement handling for single-ended reads raise ValueError(f'Invalid fastq group {invalid_fastq_groups}') sorted_groups = sorted( diff --git a/metamist/parser/sample_file_map_parser.py b/metamist/parser/sample_file_map_parser.py index 077aca912..e095d8036 100644 --- a/metamist/parser/sample_file_map_parser.py +++ b/metamist/parser/sample_file_map_parser.py @@ -6,13 +6,17 @@ import click from metamist.parser.generic_metadata_parser import GenericMetadataParser, run_as_sync -from metamist.parser.generic_parser import SingleRow +from metamist.parser.generic_parser import DefaultSequencing, SingleRow PARTICIPANT_COL_NAME = 'individual_id' SAMPLE_ID_COL_NAME = 'sample_id' READS_COL_NAME = 'filenames' SEQ_TYPE_COL_NAME = 'type' CHECKSUM_COL_NAME = 'checksum' +SEQ_FACILITY_COL_NAME = 'sequencing_facility' +SEQ_LIBRARY_COL_NAME = 'sequencing_library' +READ_END_TYPE_COL_NAME = 'read_end_type' +READ_LENGTH_COL_NAME = 'read_length' KeyMap = { PARTICIPANT_COL_NAME: [ @@ -28,6 +32,10 @@ SAMPLE_ID_COL_NAME: ['sample_id', 'sample', 'sample id'], READS_COL_NAME: ['filename', 'filenames', 'files', 'file'], SEQ_TYPE_COL_NAME: ['type', 'types', 'sequencing type', 'sequencing_type'], + SEQ_FACILITY_COL_NAME: ['facility', 'sequencing facility', 'sequencing_facility'], + SEQ_LIBRARY_COL_NAME: ['library', 'library_prep', 'library prep', 'library type', 'library_type', 'sequencing_library', 'sequencing library'], + READ_END_TYPE_COL_NAME: ['read_end_type', 'read end type', 'read_end_types', 'read end types', 'end type', 'end_type', 'end_types', 'end types'], + READ_LENGTH_COL_NAME: ['length', 'read length', 'read_length', 'read lengths', 'read_lengths'], CHECKSUM_COL_NAME: ['md5', 'checksum'], } @@ -41,6 +49,10 @@ - 'Filenames' - ['Type'] - 'Checksum' +- ['Sequencing Facility'] - needed for exome & rna samples +- ['Library Type'] - needed for exome & rna samples +- ['End Type'] - needed for rna samples +- ['Read Length'] - needed for rna samples e.g. Sample ID Filenames @@ -63,6 +75,13 @@ Apollo sample_id004 sample_id004.filename-R1.fastq.gz Apollo sample_id004 sample_id004.filename-R2.fastq.gz +Example with optional columns for RNA samples +e.g. + Individual ID Sample ID Filenames Type Facility Library End Type Read Length + Hera sample_id001 sample_id001_TSStrtRNA_R1.fastq.gz,sample_id001_TSStrtRNA_R2.fastq.gz totalrna VCGS TSStrtRNA paired 151 + Hestia sample_id002 sample_id002_TSStrmRNA_R1.fastq.gz,sample_id002_TSStrmRNA_R2.fastq.gz polyarna VCGS TSStrmRNA paired 151 + + This format is useful for ingesting filenames for the seqr loading pipeline """ @@ -78,12 +97,15 @@ def __init__( self, search_locations: List[str], project: str, - default_sequencing_type='genome', default_sample_type='blood', - default_sequencing_technology='short-read', - default_sequencing_platform='illumina', + default_sequencing=DefaultSequencing( + seq_type='genome', technology='short-read', platform='illumina' + ), + default_read_end_type: str = None, + default_read_length: str | int = None, allow_extra_files_in_search_path=False, default_reference_assembly_location: str | None = None, + verbose=True, ): super().__init__( search_locations=search_locations, @@ -93,9 +115,14 @@ def __init__( reads_column=READS_COL_NAME, checksum_column=CHECKSUM_COL_NAME, seq_type_column=SEQ_TYPE_COL_NAME, - default_sequencing_type=default_sequencing_type, + seq_facility_column=SEQ_FACILITY_COL_NAME, + seq_library_column=SEQ_LIBRARY_COL_NAME, + read_end_type_column=READ_END_TYPE_COL_NAME, + read_length_column=READ_LENGTH_COL_NAME, default_sample_type=default_sample_type, - default_sequencing_technology=default_sequencing_technology, + default_sequencing=default_sequencing, + default_read_end_type=default_read_end_type, + default_read_length=default_read_length, default_reference_assembly_location=default_reference_assembly_location, participant_meta_map={}, sample_meta_map={}, @@ -103,6 +130,7 @@ def __init__( qc_meta_map={}, allow_extra_files_in_search_path=allow_extra_files_in_search_path, key_map=KeyMap, + verbose=verbose, ) def get_sample_id(self, row: SingleRow) -> str: @@ -127,10 +155,19 @@ def get_info() -> tuple[str, str]: help='The metamist project to import manifest into', ) @click.option('--default-sample-type', default='blood') -@click.option('--default-sequence-type', default='wgs') -@click.option('--default-sequence-technology', default='short-read') +@click.option('--default-sequencing-type', default='wgs') +@click.option('--default-sequencing-technology', default='short-read') +@click.option('--default-sequencing-facility', default=None) +@click.option('--default-sequencing-library', default=None) +@click.option('--default-read-end-type', default=None) +@click.option('--default-read-length', default=None) @click.option( - '--confirm', is_flag=True, help='Confirm with user input before updating server' + '--default-reference-assembly', + required=False, + help=( + 'CRAMs require a reference assembly to realign. ' + 'This must be provided if any of the reads are crams' + ), ) @click.option( '--search-path', @@ -139,52 +176,62 @@ def get_info() -> tuple[str, str]: help='Search path to search for files within', ) @click.option( - '--dry-run', is_flag=True, help='Just prepare the run, without comitting it' -) -@click.option( - '--allow-extra-files-in-search_path', + '--allow-extra-files-in-search-path', is_flag=True, help='By default, this parser will fail if there are crams, bams, fastqs ' 'in the search path that are not covered by the sample map.', ) @click.option( - '--default-reference-assembly', - required=False, - help=( - 'CRAMs require a reference assembly to realign. ' - 'This must be provided if any of the reads are crams' - ), + '--confirm', is_flag=True, help='Confirm with user input before updating server' ) +@click.option( + '--dry-run', is_flag=True, help='Just prepare the run, without comitting it' +) +@click.option('--verbose', '-v', is_flag=True, help='Verbose output') @click.argument('manifests', nargs=-1) @run_as_sync -async def main( +async def main( # pylint: disable=too-many-arguments manifests, search_path: List[str], project, default_sample_type='blood', default_sequencing_type='genome', - default_sequence_technology='short-read', + default_sequencing_technology='short-read', + default_sequencing_platform='illumina', + default_sequencing_facility: str = None, + default_sequencing_library: str = None, + default_read_end_type: str = None, + default_read_length: str = None, default_reference_assembly: str = None, + allow_extra_files_in_search_path=False, confirm=False, dry_run=False, - allow_extra_files_in_search_path=False, + verbose=False, ): """Run script from CLI arguments""" if not manifests: raise ValueError('Expected at least 1 manifest') - extra_seach_paths = [m for m in manifests if m.startswith('gs://')] - if extra_seach_paths: - search_path = list(set(search_path).union(set(extra_seach_paths))) + extra_search_paths = [m for m in manifests if m.startswith('gs://')] + if extra_search_paths: + search_path = list(set(search_path).union(set(extra_search_paths))) parser = SampleFileMapParser( project=project, default_sample_type=default_sample_type, - default_sequencing_type=default_sequencing_type, - default_sequencing_technology=default_sequence_technology, + default_sequencing=DefaultSequencing( + seq_type=default_sequencing_type, + technology=default_sequencing_technology, + platform=default_sequencing_platform, + facility=default_sequencing_facility, + library=default_sequencing_library, + ), + default_read_end_type=default_read_end_type, + default_read_length=default_read_length, + default_reference_assembly_location=default_reference_assembly, search_locations=search_path, allow_extra_files_in_search_path=allow_extra_files_in_search_path, - default_reference_assembly_location=default_reference_assembly, + verbose=verbose, ) for manifest in manifests: logger.info(f'Importing {manifest}') diff --git a/scripts/parse_existing_cohort.py b/scripts/parse_existing_cohort.py index 1df35063c..e0665e1ca 100644 --- a/scripts/parse_existing_cohort.py +++ b/scripts/parse_existing_cohort.py @@ -49,7 +49,7 @@ SingleRow, run_as_sync, ) -from metamist.parser.generic_parser import READS_EXTENSIONS +from metamist.parser.generic_parser import READS_EXTENSIONS, DefaultSequencing logger = logging.getLogger(__file__) logger.addHandler(logging.StreamHandler()) @@ -132,7 +132,9 @@ def __init__( assay_meta_map=Columns.sequence_meta_map(), batch_number=batch_number, allow_extra_files_in_search_path=True, - default_sequencing_type=sequencing_type, + default_sequencing=DefaultSequencing( + seq_type=sequencing_type, + ) ) def _get_dict_reader(self, file_pointer, delimiter: str): diff --git a/scripts/parse_ont_sheet.py b/scripts/parse_ont_sheet.py index 61865a122..29fe0c2ba 100644 --- a/scripts/parse_ont_sheet.py +++ b/scripts/parse_ont_sheet.py @@ -6,7 +6,11 @@ import click from metamist.parser.generic_metadata_parser import GenericMetadataParser, run_as_sync -from metamist.parser.generic_parser import ParsedSample, ParsedSequencingGroup +from metamist.parser.generic_parser import ( + DefaultSequencing, + ParsedSample, + ParsedSequencingGroup, +) logger = logging.getLogger(__file__) logger.addHandler(logging.StreamHandler()) @@ -37,10 +41,12 @@ def __init__( self, search_locations: List[str], project: str, - default_sequencing_type='genome', - default_sequencing_technology='long-read', - default_sequencing_platform='oxford-nanopore', default_sample_type='blood', + default_sequencing=DefaultSequencing( + seq_type='genome', + technology='long-read', + platform='oxford-nanopore', + ), allow_extra_files_in_search_path=False, ): sequence_meta_map = { @@ -66,9 +72,7 @@ def __init__( sample_name_column=Columns.SAMPLE_ID, reads_column=Columns.PASS_FASTQ_FILENAME, default_sample_type=default_sample_type, - default_sequencing_type=default_sequencing_type, - default_sequencing_technology=default_sequencing_technology, - default_sequencing_platform=default_sequencing_platform, + default_sequencing=default_sequencing, participant_meta_map={}, sample_meta_map={}, assay_meta_map=sequence_meta_map, @@ -93,8 +97,8 @@ def parse_fastqs_structure(fastqs) -> List[List[str]]: """ return [fastqs] - async def group_assays(self, sample: ParsedSample) -> list[ParsedSequencingGroup]: - sequencing_groups = await super().group_assays(sample) + async def get_sample_sequencing_groups(self, sample: ParsedSample) -> list[ParsedSequencingGroup]: + sequencing_groups = await super().get_sample_sequencing_groups(sample) for sequencing_group in sequencing_groups: failed_fastqs: list[str] = [] @@ -128,7 +132,7 @@ async def group_assays(self, sample: ParsedSample) -> list[ParsedSequencingGroup help='The metamist project to import manifest into', ) @click.option('--default-sample-type', default='blood') -@click.option('--default-sequence-type', default='genome') +@click.option('--default-sequencing-type', default='genome') @click.option('--default-sequencing-technology', default='long-read') @click.option('--default-sequencing-platform', default='oxford-nanopore') @click.option( @@ -167,16 +171,18 @@ async def main( if not manifests: raise ValueError('Expected at least 1 manifest') - extra_seach_paths = [m for m in manifests if m.startswith('gs://')] - if extra_seach_paths: - search_path = list(set(search_path).union(set(extra_seach_paths))) + extra_search_paths = [m for m in manifests if m.startswith('gs://')] + if extra_search_paths: + search_path = list(set(search_path).union(set(extra_search_paths))) parser = OntParser( project=project, default_sample_type=default_sample_type, - default_sequencing_type=default_sequencing_type, - default_sequencing_platform=default_sequencing_platform, - default_sequencing_technology=default_sequencing_technology, + default_sequencing=DefaultSequencing( + seq_type=default_sequencing_type, + technology=default_sequencing_technology, + platform=default_sequencing_platform, + ), search_locations=search_path, allow_extra_files_in_search_path=allow_extra_files_in_search_path, ) diff --git a/scripts/parse_sample_file_map.py b/scripts/parse_sample_file_map.py index ccc5059a3..e4e4ca7bb 100755 --- a/scripts/parse_sample_file_map.py +++ b/scripts/parse_sample_file_map.py @@ -5,7 +5,7 @@ import click -from metamist.parser.generic_metadata_parser import run_as_sync +from metamist.parser.generic_metadata_parser import DefaultSequencing, run_as_sync from metamist.parser.sample_file_map_parser import SampleFileMapParser __DOC = """ @@ -36,7 +36,7 @@ ) @click.option('--default-sample-type', default='blood') @click.option('--default-sequencing-type', default='wgs') -@click.option('--default-sequence-technology', default='short-read') +@click.option('--default-sequencing-technology', default='short-read') @click.option( '--confirm', is_flag=True, help='Confirm with user input before updating server' ) @@ -67,7 +67,7 @@ async def main( project, default_sample_type='blood', default_sequencing_type='wgs', - default_sequence_technology='short-read', + default_sequencing_technology='short-read', confirm=False, dry_run=False, allow_extra_files_in_search_path=False, @@ -77,15 +77,17 @@ async def main( if not manifests: raise ValueError('Expected at least 1 manifest') - extra_seach_paths = [m for m in manifests if m.startswith('gs://')] - if extra_seach_paths: - search_path = list(set(search_path).union(set(extra_seach_paths))) + extra_search_paths = [m for m in manifests if m.startswith('gs://')] + if extra_search_paths: + search_path = list(set(search_path).union(set(extra_search_paths))) parser = SampleFileMapParser( project=project, default_sample_type=default_sample_type, - default_sequencing_type=default_sequencing_type, - default_sequencing_technology=default_sequence_technology, + default_sequencing=DefaultSequencing( + seq_type=default_sequencing_type, + technology=default_sequencing_technology, + ), search_locations=search_path, allow_extra_files_in_search_path=allow_extra_files_in_search_path, default_reference_assembly_location=ref, diff --git a/scripts/parse_vcgs_manifest.py b/scripts/parse_vcgs_manifest.py index 491640322..78c4691a6 100644 --- a/scripts/parse_vcgs_manifest.py +++ b/scripts/parse_vcgs_manifest.py @@ -114,14 +114,14 @@ def get_sequencing_type(self, row: SingleRow) -> SequenceType: types = [types] if len(types) <= 0: if ( - self.default_sequencing_type is None - or self.default_sequencing_type.lower() == 'none' + self.default_sequencing.seq_type is None + or self.default_sequencing.seq_type.lower() == 'none' ): raise ValueError( f"Couldn't detect sequence type for sample {sample_id}, and " 'no default was available.' ) - return SequenceType(self.default_sequencing_type) + return SequenceType(self.default_sequencing.seq_type) if len(types) > 1: raise ValueError( f'Multiple library types for same sample {sample_id}, ' diff --git a/setup.py b/setup.py index 1c1d579c0..d49473b98 100644 --- a/setup.py +++ b/setup.py @@ -38,6 +38,7 @@ # for get id-token 'cpg-utils >= 4.9.4', 'gql[aiohttp,requests]', + 'tabulate >= 0.9.0' ], entry_points={ 'metamist_parser': [ diff --git a/test/test_parse_existing_cohort.py b/test/test_parse_existing_cohort.py index 8fb169803..a40989857 100644 --- a/test/test_parse_existing_cohort.py +++ b/test/test_parse_existing_cohort.py @@ -299,7 +299,7 @@ async def test_genome_sequencing_type(self): allow_missing_files=True, sequencing_type='genome', ) - self.assertEqual(parser.default_sequencing_type, 'genome') + self.assertEqual(parser.default_sequencing.seq_type, 'genome') @run_as_sync async def test_exome_sequencing_type(self): @@ -314,7 +314,7 @@ async def test_exome_sequencing_type(self): allow_missing_files=True, sequencing_type='exome', ) - self.assertEqual(parser.default_sequencing_type, 'exome') + self.assertEqual(parser.default_sequencing.seq_type, 'exome') @run_as_sync @patch('metamist.parser.generic_parser.query_async') diff --git a/test/test_parse_file_map.py b/test/test_parse_file_map.py index 22e7b700f..ff1aa5046 100644 --- a/test/test_parse_file_map.py +++ b/test/test_parse_file_map.py @@ -2,7 +2,7 @@ from test.testbase import DbIsolatedTest, run_as_sync from unittest.mock import patch -from metamist.parser.generic_parser import ParsedParticipant +from metamist.parser.generic_parser import DefaultSequencing, ParsedParticipant from metamist.parser.sample_file_map_parser import SampleFileMapParser @@ -25,7 +25,7 @@ async def test_single_row_fastq(self, mock_graphql_query): parser = SampleFileMapParser( search_locations=[], project=self.project_name, - default_sequencing_technology='short-read', + default_sequencing=DefaultSequencing(), ) fs = ['.filename-R1.fastq.gz', '.filename-R2.fastq.gz'] parser.filename_map = {k: 'gs://BUCKET/FAKE/' + k for k in fs} @@ -91,7 +91,7 @@ async def test_to_external(self, mock_graphql_query): parser = SampleFileMapParser( search_locations=[], project=self.project_name, - default_sequencing_technology='short-read', + default_sequencing=DefaultSequencing(), ) fs = ['.filename-R1.fastq.gz', '.filename-R2.fastq.gz'] parser.filename_map = {k: 'gs://BUCKET/FAKE/' + k for k in fs} @@ -148,7 +148,7 @@ async def test_two_rows_with_provided_checksums(self, mock_graphql_query): self.maxDiff = None self.assertDictEqual({}, participants[0].samples[0].meta) - expected_sequence1_reads = [ + expected_assay1_reads = [ { 'location': 'gs://BUCKET/FAKE/.filename-R1.fastq.gz', 'basename': '.filename-R1.fastq.gz', @@ -168,11 +168,11 @@ async def test_two_rows_with_provided_checksums(self, mock_graphql_query): ] self.assertListEqual( - expected_sequence1_reads, + expected_assay1_reads, participants[0].samples[0].sequencing_groups[0].assays[0].meta['reads'], ) - expected_sequence2_reads = [ + expected_assay2_reads = [ { 'location': 'gs://BUCKET/FAKE/.filename-R1.fastq.gz', 'basename': '.filename-R1.fastq.gz', @@ -191,6 +191,164 @@ async def test_two_rows_with_provided_checksums(self, mock_graphql_query): }, ] self.assertListEqual( - expected_sequence2_reads, + expected_assay2_reads, participants[1].samples[0].sequencing_groups[0].assays[0].meta['reads'], ) + + @run_as_sync + @patch('metamist.parser.generic_parser.query_async') + async def test_valid_rna_rows(self, mock_graphql_query): + """ + Test importing a single row of rna data + """ + + mock_graphql_query.side_effect = self.run_graphql_query_async + + rows = [ + 'Sample ID\tFilenames\tType\tfacility\tlibrary\tend_type\tread_length', + '\t.filename-R1.fastq.gz,.filename-R2.fastq.gz\tpolyarna\tVCGS\tTSStrmRNA\tpaired\t151', + '\t.filename-R1.fastq.gz\ttotalrna\tVCGS\tTSStrtRNA\tpaired\t151', + '\t.filename-R2.fastq.gz\ttotalrna\tVCGS\tTSStrtRNA\tpaired\t151', + ] + + parser = SampleFileMapParser( + search_locations=[], + # doesn't matter, we're going to mock the call anyway + project=self.project_name, + ) + fs = [ + '.filename-R1.fastq.gz', + '.filename-R2.fastq.gz', + '.filename-R1.fastq.gz', + '.filename-R2.fastq.gz', + ] + parser.filename_map = {k: 'gs://BUCKET/FAKE/' + k for k in fs} + parser.skip_checking_gcs_objects = True + + file_contents = '\n'.join(rows) + summary, samples = await parser.parse_manifest( + StringIO(file_contents), delimiter='\t', dry_run=True + ) + + self.assertEqual(0, summary['participants']['insert']) + self.assertEqual(0, summary['participants']['update']) + self.assertEqual(2, summary['samples']['insert']) + self.assertEqual(0, summary['samples']['update']) + self.assertEqual(2, summary['assays']['insert']) + self.assertEqual(0, summary['assays']['update']) + self.maxDiff = None + + self.assertEqual('polyarna', samples[0].sequencing_groups[0].sequencing_type) + expected_sg1_meta = { + 'sequencing_facility': 'VCGS', + 'sequencing_library': 'TSStrmRNA', + 'read_end_type': 'paired', + 'read_length': 151 + } + self.assertDictEqual( + expected_sg1_meta, + samples[0].sequencing_groups[0].meta, + ) + + self.assertEqual('totalrna', samples[1].sequencing_groups[0].sequencing_type) + expected_sg2_meta = { + 'sequencing_facility': 'VCGS', + 'sequencing_library': 'TSStrtRNA', + 'read_end_type': 'paired', + 'read_length': 151 + } + self.assertDictEqual( + expected_sg2_meta, + samples[1].sequencing_groups[0].meta, + ) + + @run_as_sync + @patch('metamist.parser.generic_parser.query_async') + async def test_invalid_rna_row(self, mock_graphql_query): + """ + Test importing a single row of rna data + """ + + mock_graphql_query.side_effect = self.run_graphql_query_async + + rows = [ + 'Sample ID\tFilenames\tType', + '\t.filename-R1.fastq.gz,.filename-R2.fastq.gz\tpolyarna' + ] + + parser = SampleFileMapParser( + search_locations=[], + # doesn't matter, we're going to mock the call anyway + project=self.project_name, + ) + fs = [ + '.filename-R1.fastq.gz', + '.filename-R2.fastq.gz', + ] + parser.filename_map = {k: 'gs://BUCKET/FAKE/' + k for k in fs} + parser.skip_checking_gcs_objects = True + + file_contents = '\n'.join(rows) + with self.assertRaises( + ValueError + ): + _, _ = await parser.parse_manifest( + StringIO(file_contents), delimiter='\t', dry_run=True + ) + + @run_as_sync + @patch('metamist.parser.generic_parser.query_async') + async def test_rna_row_with_default_field_values(self, mock_graphql_query): + """ + Test importing a single row of rna data + """ + + mock_graphql_query.side_effect = self.run_graphql_query_async + + rows = [ + 'Sample ID\tFilenames\tType', + '\t.filename-R1.fastq.gz,.filename-R2.fastq.gz\tpolyarna' + ] + + parser = SampleFileMapParser( + search_locations=[], + # doesn't matter, we're going to mock the call anyway + project=self.project_name, + default_sequencing=DefaultSequencing( + facility='VCGS', + library='TSStrmRNA' + ), + default_read_end_type='paired', + default_read_length=151 + ) + fs = [ + '.filename-R1.fastq.gz', + '.filename-R2.fastq.gz', + ] + parser.filename_map = {k: 'gs://BUCKET/FAKE/' + k for k in fs} + parser.skip_checking_gcs_objects = True + + file_contents = '\n'.join(rows) + summary, samples = await parser.parse_manifest( + StringIO(file_contents), delimiter='\t', dry_run=True + ) + + self.assertEqual(0, summary['participants']['insert']) + self.assertEqual(0, summary['participants']['update']) + self.assertEqual(1, summary['samples']['insert']) + self.assertEqual(0, summary['samples']['update']) + self.assertEqual(1, summary['assays']['insert']) + self.assertEqual(0, summary['assays']['update']) + self.maxDiff = None + + self.assertEqual('polyarna', samples[0].sequencing_groups[0].sequencing_type) + expected_sg1_meta = { + 'sequencing_facility': 'VCGS', + 'sequencing_library': 'TSStrmRNA', + 'read_end_type': 'paired', + 'read_length': 151 + } + self.assertDictEqual( + expected_sg1_meta, + samples[0].sequencing_groups[0].meta, + ) From c7f53b2e906693e107bf6e96ac7eacd0ccaf8720 Mon Sep 17 00:00:00 2001 From: EddieLF <34049565+EddieLF@users.noreply.github.com> Date: Mon, 19 Feb 2024 12:06:57 +1100 Subject: [PATCH 5/5] Update sequencing group meta API endpoint fix + bumpversion minor (#684) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Update SG meta endpoint to use PATCH, not GET * Bump version: 6.7.0 → 6.8.0 --- .bumpversion.cfg | 2 +- api/routes/sequencing_groups.py | 2 +- api/server.py | 2 +- deploy/python/version.txt | 2 +- setup.py | 2 +- web/package.json | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 6331c36cc..26fed7085 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 6.7.0 +current_version = 6.8.0 commit = True tag = False parse = (?P\d+)\.(?P\d+)\.(?P[A-z0-9-]+) diff --git a/api/routes/sequencing_groups.py b/api/routes/sequencing_groups.py index f3be40236..9a35ee1e4 100644 --- a/api/routes/sequencing_groups.py +++ b/api/routes/sequencing_groups.py @@ -56,7 +56,7 @@ async def get_all_sequencing_group_ids_by_sample_by_type( } -@router.get('/project/{sequencing_group_id}', operation_id='updateSequencingGroup') +@router.patch('/project/{sequencing_group_id}', operation_id='updateSequencingGroup') async def update_sequencing_group( sequencing_group_id: str, sequencing_group: SequencingGroupMetaUpdateModel, diff --git a/api/server.py b/api/server.py index 1879bec50..b5d58209b 100644 --- a/api/server.py +++ b/api/server.py @@ -19,7 +19,7 @@ from db.python.utils import get_logger # This tag is automatically updated by bump2version -_VERSION = '6.7.0' +_VERSION = '6.8.0' logger = get_logger() diff --git a/deploy/python/version.txt b/deploy/python/version.txt index f0e13c509..e029aa99b 100644 --- a/deploy/python/version.txt +++ b/deploy/python/version.txt @@ -1 +1 @@ -6.7.0 +6.8.0 diff --git a/setup.py b/setup.py index d49473b98..7e6ad5d47 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ setup( name=PKG, # This tag is automatically updated by bump2version - version='6.7.0', + version='6.8.0', description='Python API for interacting with the Sample API system', long_description=readme, long_description_content_type='text/markdown', diff --git a/web/package.json b/web/package.json index 40e260617..852999cee 100644 --- a/web/package.json +++ b/web/package.json @@ -1,6 +1,6 @@ { "name": "metamist", - "version": "6.7.0", + "version": "6.8.0", "private": true, "dependencies": { "@apollo/client": "^3.7.3",