From 2252b2786a985c39506850b66c67788f757357dd Mon Sep 17 00:00:00 2001 From: vivbak Date: Wed, 12 Jul 2023 14:15:22 +1000 Subject: [PATCH 01/10] add family sg function + silence the linter --- scripts/create_test_subset.py | 129 +++++++++++++++++++++++++++------- 1 file changed, 105 insertions(+), 24 deletions(-) diff --git a/scripts/create_test_subset.py b/scripts/create_test_subset.py index fd1e74381..475aaad3d 100755 --- a/scripts/create_test_subset.py +++ b/scripts/create_test_subset.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# pylint: disable=too-many-instance-attributes,too-many-locals,unused-argument,wrong-import-order,unused-argument,too-many-arguments +# pylint: disable=too-many-instance-attributes,too-many-locals,too-many-arguments """ Example Invocation @@ -11,13 +11,12 @@ This example will populate acute-care-test with the metamist data for 4 families. """ -from typing import Optional +from typing import Optional, Counter as CounterType import logging import os import random import subprocess import traceback -import typing from collections import Counter import csv @@ -40,6 +39,8 @@ AnalysisStatus, ) +from metamist.graphql import gql, query + logger = logging.getLogger(__file__) logging.basicConfig(format='%(levelname)s (%(name)s %(lineno)s): %(message)s') logger.setLevel(logging.INFO) @@ -99,6 +100,13 @@ This is in addition to the number of families specified in --families and the number of samples specified in -n""", ) +@click.option( + '--noninteractive', + 'noninteractive', + is_flag=True, + default=False, + help='Skip interactive confirmation', +) def main( project: str, samples_n: Optional[int], @@ -106,13 +114,14 @@ def main( skip_ped: Optional[bool] = True, additional_families: Optional[tuple[str]] = None, additional_samples: Optional[tuple[str]] = None, + noninteractive: Optional[bool] = False, ): """ Script creates a test subset for a given project. A new project with a prefix -test is created, and for any files in sample/meta, sequence/meta, or analysis/output a copy in the -test namespace is created. """ - samples_n, families_n = _validate_opts(samples_n, families_n) + samples_n, families_n = _validate_opts(samples_n, families_n, noninteractive) _additional_families: list[str] = list(additional_families) _additional_samples: list[str] = list(additional_samples) @@ -123,7 +132,7 @@ def main( } ) logger.info(f'Found {len(all_samples)} samples') - if samples_n and samples_n >= len(all_samples): + if (samples_n and samples_n >= len(all_samples)) and not noninteractive: resp = str( input( f'Requesting {samples_n} samples which is >= ' @@ -225,13 +234,14 @@ def main( for a_type, analysis_by_sid in analysis_by_sid_by_type.items(): try: # TODO: fix this - analyses: list[ - dict[str, str] - ] = aapi.get_latest_analysis_for_samples_and_type( - project=project, - analysis_type=str(a_type), - request_body=sample_ids, - ) + # analyses: list[ + # dict[str, str] + # ] = aapi.get_latest_analysis_for_samples_and_type( + # project=project, + # analysis_type=str(a_type), + # request_body=sample_ids, + # ) + analyses: list[dict[str, str]] = [] except exceptions.ApiException: traceback.print_exc() else: @@ -442,8 +452,7 @@ def get_map_ipid_esid( ip_es_map = [] for ip_is_pair in ip_is_map: - samples_per_participant = [] - samples_per_participant.append(ip_is_pair[0]) + samples_per_participant = [ip_is_pair[0]] for isid in ip_is_pair[1:]: if isid in is_es_map: samples_per_participant.append(is_es_map[isid]) @@ -455,10 +464,9 @@ def get_map_ipid_esid( return external_sample_internal_participant_map -def get_samples_for_families(project: str, additional_families: list[str]): +def get_samples_for_families(project: str, additional_families: list[str]) -> list[str]: """Returns the samples that belong to a list of families""" - samples: list[str] = [] full_pedigree = fapi.get_pedigree( project=project, replace_with_participant_external_ids=False, @@ -479,7 +487,7 @@ def get_samples_for_families(project: str, additional_families: list[str]): } ) - samples = [sample['id'] for sample in sample_objects] + samples: list[str] = [sample['id'] for sample in sample_objects] return samples @@ -487,9 +495,8 @@ def get_samples_for_families(project: str, additional_families: list[str]): def get_fams_for_samples( project: str, additional_samples: Optional[list[str]] = None, -): +) -> list[str]: """Returns the families that a list of samples belong to""" - fams: set[str] = set() sample_objects = sapi.get_samples( body_get_samples={ 'project_ids': [project], @@ -505,7 +512,7 @@ def get_fams_for_samples( replace_with_family_external_ids=True, ) - fams = { + fams: set[str] = { fam['family_id'] for fam in full_pedigree if str(fam['individual_id']) in pids } @@ -526,7 +533,7 @@ def _normalise_map(unformatted_map: list[list[str]]) -> dict[str, str]: def _validate_opts( - samples_n: int, families_n: int + samples_n: int, families_n: int, noninteractive: bool ) -> tuple[Optional[int], Optional[int]]: if samples_n is not None and families_n is not None: raise click.BadParameter('Please specify only one of --samples or --families') @@ -541,7 +548,7 @@ def _validate_opts( if families_n is not None and families_n < 1: raise click.BadParameter('Please specify --families higher than 0') - if families_n is not None and families_n >= 30: + if (families_n is not None and families_n >= 30) and not noninteractive: resp = str( input( f'You requested a subset of {families_n} families. ' @@ -551,7 +558,7 @@ def _validate_opts( if resp.lower() != 'y': raise SystemExit() - if samples_n is not None and samples_n >= 100: + if (samples_n is not None and samples_n >= 100) and not noninteractive: resp = str( input( f'You requested a subset of {samples_n} samples. ' @@ -565,7 +572,7 @@ def _validate_opts( def _print_fam_stats(families: list[dict[str, str]]): family_sizes = Counter([fam['family_id'] for fam in families]) - fam_by_size: typing.Counter[int] = Counter() + fam_by_size: CounterType[int] = Counter() # determine number of singles, duos, trios, etc for fam in family_sizes: fam_by_size[family_sizes[fam]] += 1 @@ -727,6 +734,80 @@ def file_exists(path: str) -> bool: return os.path.exists(path) +def _get_sgs_for_families( + project: str, + families_n: int, + additional_families, +) -> list[str]: + """Returns specific sequencing groups to be included in the test project.""" + + included_sgids: list = [] + _num_families_to_subset: int = None + _randomly_selected_families: list = [] + + # Case 1: If neither families_n nor _additional_families + if not families_n and not additional_families: + return included_sgids + + # Case 2: If families_n but not _additional_families + if families_n and not additional_families: + _num_families_to_subset = families_n + + # Case 3: If both families_n and _additional_families + if families_n and additional_families: + _num_families_to_subset = families_n - len(additional_families) + + QUERY_FAMILY_SGID = gql( + """ + query FamilyQuery($project: String!) { + project(name: $project) { + families { + id + externalId + participants { + samples { + sequencingGroups { + id + } + } + } + } + + } + } + """ + ) + + family_sgid_output = query(QUERY_FAMILY_SGID, {'project': project}) + + # 1. Remove the families in _families_to_subset + all_family_sgids = family_sgid_output.get('project').get('families') + _filtered_family_sgids = [ + fam for fam in all_family_sgids if fam['externalId'] not in additional_families + ] + _user_input_families = [ + fam for fam in all_family_sgids if fam['externalId'] in additional_families + ] + + # 2. Randomly select _num_families_to_subset from the remaining families + if _num_families_to_subset: + _randomly_selected_families = random.sample( + _filtered_family_sgids, _num_families_to_subset + ) + + # 3. Combine the families in _families_to_subset with the randomly selected families & return sequencing group ids + + _all_families_to_subset = _randomly_selected_families + _user_input_families + + for fam in _all_families_to_subset: + for participant in fam['participants']: + for sample in participant['samples']: + for sg in sample['sequencingGroups']: + included_sgids.append(sg['id']) + + return included_sgids + + if __name__ == '__main__': # pylint: disable=no-value-for-parameter main() From 24fa8d390c23d35e87692553992e8f3a1583aa27 Mon Sep 17 00:00:00 2001 From: vivbak Date: Thu, 20 Jul 2023 09:38:08 +1000 Subject: [PATCH 02/10] Lots of changes tbh --- scripts/create_test_subset.py | 736 +++++++++++++++++++--------------- 1 file changed, 405 insertions(+), 331 deletions(-) diff --git a/scripts/create_test_subset.py b/scripts/create_test_subset.py index 475aaad3d..0b2e6a61d 100755 --- a/scripts/create_test_subset.py +++ b/scripts/create_test_subset.py @@ -16,14 +16,12 @@ import os import random import subprocess -import traceback from collections import Counter import csv import click from google.cloud import storage -from metamist import exceptions from metamist.apis import ( AnalysisApi, AssayApi, @@ -32,11 +30,12 @@ ParticipantApi, ) from metamist.models import ( - BodyGetAssaysByCriteria, AssayUpsert, SampleUpsert, Analysis, AnalysisStatus, + AnalysisUpdateModel, + SequencingGroupUpsert, ) from metamist.graphql import gql, query @@ -53,6 +52,111 @@ DEFAULT_SAMPLES_N = 10 +SG_ID_QUERY = gql( + """ + query getSGIds($project: String!) { + project(name: $project) { + samples{ + id + externalId + sequencingGroups { + id + } + } + } + } + """ +) + +QUERY_ALL_DATA = gql( + """ + query getAllData($project: String!, $sids: [String!]) { + project(name: $project) { + samples(id: {in_: $sids}) { + id + meta + type + externalId + participant { + externalId + id + karyotype + meta + reportedGender + reportedSex + } + sequencingGroups{ + id + meta + platform + technology + type + assays { + id + meta + type + } + analyses { + active + id + meta + output + status + timestampCompleted + type + } + } + } + } + } + """ +) + +# TODO: We can change this to filter external sample ids +EXISTING_DATA_QUERY = gql( + """ + query getExistingData($project: String!) { + project(name: $project) { + samples{ + id + externalId + sequencingGroups { + id + type + assays { + id + type + } + analyses { + id + type + } + } + } + } + } + """ +) + +QUERY_FAMILY_SGID = gql( + """ + query FamilyQuery($project: String!) { + project(name: $project) { + families { + id + externalId + participants { + samples { + id + } + } + } + + } + } +""" +) + @click.command() @click.option( @@ -125,205 +229,313 @@ def main( _additional_families: list[str] = list(additional_families) _additional_samples: list[str] = list(additional_samples) - all_samples = sapi.get_samples( - body_get_samples={ - 'project_ids': [project], - 'active': True, - } + specific_sids = _get_sids_for_families( + project, + families_n, + _additional_families, ) - logger.info(f'Found {len(all_samples)} samples') - if (samples_n and samples_n >= len(all_samples)) and not noninteractive: - resp = str( - input( - f'Requesting {samples_n} samples which is >= ' - f'than the number of available samples ({len(all_samples)}). ' - f'The test project will be a copy of the production project. ' - f'Please confirm (y): ' - ) - ) - if resp.lower() != 'y': - raise SystemExit() + if not samples_n and not families_n: + samples_n = DEFAULT_SAMPLES_N + if not samples_n and families_n: + samples_n = 0 - random.seed(42) # for reproducibility + specific_sids = specific_sids + _additional_samples - pid_sid = papi.get_external_participant_id_to_internal_sample_id(project) - sample_id_by_participant_id = dict(pid_sid) + # 1. Query all the SGIDS + sid_output = query(SG_ID_QUERY, variables={'project': project}) + all_sids = [sid['id'] for sid in sid_output.get('project').get('samples')] - if families_n: - if _additional_samples: - _additional_families.extend( - get_fams_for_samples( - project, - _additional_samples, - ) - ) - fams = fapi.get_families(project) - _additional_families_set = set(_additional_families) - families_to_subset = [ - family['id'] - for family in fams - if family['external_id'] not in _additional_families_set - ] - full_pedigree = fapi.get_pedigree( - project=project, internal_family_ids=families_to_subset - ) - external_family_ids = _get_random_families(full_pedigree, families_n) - external_family_ids.extend(_additional_families) - internal_family_ids = [ - fam['id'] for fam in fams if fam['external_id'] in external_family_ids - ] - pedigree = fapi.get_pedigree( - project=project, internal_family_ids=internal_family_ids + # 2. Subtract the specific_sgs from all the sgs + sgids_after_inclusions = list(set(all_sids) - set(specific_sids)) + # 3. Randomly select from the remaining sgs + random_sgs: list[str] = [] + random.seed(42) # for reproducibility + if (samples_n - len(specific_sids)) > 0: + random_sgs = random.sample( + sgids_after_inclusions, samples_n - len(specific_sids) ) - _print_fam_stats(pedigree) - - p_ids = [ped['individual_id'] for ped in pedigree] - sample_ids = [ - sample - for (participant, sample) in sample_id_by_participant_id.items() - if participant in p_ids - ] - sample_set = set(sample_ids) - samples = [s for s in all_samples if s['id'] in sample_set] - - else: - if _additional_families: - _additional_samples.extend( - get_samples_for_families(project, _additional_families) - ) - - _additional_samples_set = set(_additional_samples) - samples_to_subset = [ - sample - for sample in all_samples - if sample['id'] not in _additional_samples_set - ] - samples_to_add = [ - sample for sample in all_samples if sample['id'] in _additional_samples_set - ] - samples = random.sample(list(samples_to_subset), samples_n) - samples.extend(samples_to_add) - sample_ids = [s['id'] for s in samples] - - logger.info( - f'Subset to {len(samples)} samples (internal ID / external ID): ' - f'{_pretty_format_samples(samples)}' + # 4. Add the specific_sgs to the randomly selected sgs + final_subset_sids = specific_sids + random_sgs + # 5. Query all the samples from the selected sgs + original_project_subset_data = query( + QUERY_ALL_DATA, {'project': project, 'sids': final_subset_sids} ) # Populating test project target_project = project + '-test' - logger.info('Checking any existing test samples in the target test project') - test_sample_by_external_id = _process_existing_test_samples(target_project, samples) - - try: - seq_infos: list[dict[str, str]] = assayapi.get_assays_by_criteria( - body_get_assays_by_criteria=BodyGetAssaysByCriteria( - sample_ids=sample_ids, - ) - ) - except exceptions.ApiException: - seq_info_by_s_id = {} - else: - seq_info_by_s_id = {seq['sample_id']: seq for seq in seq_infos} - - analysis_by_sid_by_type: dict[str, dict[str, dict[str, str]]] = { - 'cram': {}, - 'gvcf': {}, - } - for a_type, analysis_by_sid in analysis_by_sid_by_type.items(): - try: - # TODO: fix this - # analyses: list[ - # dict[str, str] - # ] = aapi.get_latest_analysis_for_samples_and_type( - # project=project, - # analysis_type=str(a_type), - # request_body=sample_ids, - # ) - analyses: list[dict[str, str]] = [] - except exceptions.ApiException: - traceback.print_exc() - else: - for a in analyses: - analysis_by_sid[a['sample_ids'][0]] = a - logger.info(f'Will copy {a_type} analysis entries: {analysis_by_sid}') + # Pull Participant Data + participant_data = [] + participant_ids: list = [] + for sg in original_project_subset_data.get('project').get('samples'): + participant = sg.get('participant') + if participant: + participant_data.append(participant) + participant_ids.append(participant.get('externalId')) # Parse Families & Participants - participant_ids = [int(sample['participant_id']) for sample in samples] if skip_ped: # If no family data is available, only the participants should be transferred. - external_participant_ids = transfer_participants( - initial_project=project, + upserted_participant_map = transfer_participants( target_project=target_project, - participant_ids=participant_ids, + participant_data=participant_data, ) else: family_ids = transfer_families(project, target_project, participant_ids) - external_participant_ids = transfer_ped(project, target_project, family_ids) + transfer_ped(project, target_project, family_ids) - external_sample_internal_participant_map = get_map_ipid_esid( - project, target_project, external_participant_ids + existing_data = query(EXISTING_DATA_QUERY, {'project': target_project}) + + samples = original_project_subset_data.get('project').get('samples') + transfer_samples_sgs_assays( + samples, existing_data, upserted_participant_map, target_project, project ) + transfer_analyses(samples, existing_data, target_project, project) - new_sample_map = {} +def transfer_samples_sgs_assays( + samples: dict, + existing_data: dict, + upserted_participant_map: dict[str, int], + target_project: str, + project: str, +): + """ + Transfer samples, sequencing groups, and assays from the original project to the + test project. + """ for s in samples: + sample_sgs: list[SequencingGroupUpsert] = [] + for sg in s.get('sequencingGroups'): + sg_assays: list[AssayUpsert] = [] + _existing_sg = _get_existing_sg( + existing_data, s.get('externalId'), sg.get('type') + ) + _existing_sgid = _existing_sg.get('id', None) + for assay in sg.get('assays'): + _existing_assay: dict[str, str] = {} + if _existing_sgid: + _existing_assay = _get_existing_assay( + existing_data, + s.get('externalId'), + _existing_sgid, + assay.get('type'), + ) + assay_upsert = AssayUpsert( + type=assay.get('type'), + id=_existing_assay.get('id', None), + external_ids=assay.get('externalIds') or {}, + # sample_id=self.s, + meta=assay.get('meta'), + ) + sg_assays.append(assay_upsert) + sg_upsert = SequencingGroupUpsert( + id=_existing_sgid, + external_ids=sg.get('externalIds') or {}, + meta=sg.get('meta'), + platform=sg.get('platform'), + technology=sg.get('technology'), + type=sg.get('type'), + assays=sg_assays, + ) + sample_sgs.append(sg_upsert) + + _sample_type = None if s['type'] == 'None' else s['type'] + _existing_sid: str = None + _existing_sample = _get_existing_sample(existing_data, s['externalId']) + if _existing_sample: + _existing_sid = _existing_sample['id'] + + _existing_pid: int = None + if s['participant']: + _existing_pid = upserted_participant_map[s['participant']['externalId']] + + sample_upsert = SampleUpsert( + external_id=s['externalId'], + type=_sample_type or None, + meta=(_copy_files_in_dict(s['meta'], project) or {}), + participant_id=_existing_pid, + sequencing_groups=sample_sgs, + id=_existing_sid, + ) + logger.info(f'Processing sample {s["id"]}') + logger.info('Creating test sample entry') + sapi.create_sample( + project=target_project, + sample_upsert=sample_upsert, + ) - if s['external_id'] in test_sample_by_external_id: - new_s_id = test_sample_by_external_id[s['external_id']]['id'] - logger.info(f'Sample already in test project, with ID {new_s_id}') - new_sample_map[s['id']] = new_s_id - else: - logger.info('Creating test sample entry') - new_s_id = sapi.create_sample( - project=target_project, - sample_upsert=SampleUpsert( - external_id=s['external_id'], - type=s['type'], - meta=(_copy_files_in_dict(s['meta'], project) or {}), - participant_id=external_sample_internal_participant_map[ - s['external_id'] - ], - ), +def transfer_analyses( + samples: dict, existing_data: dict, target_project: str, project: str +): + """ + This function will transfer the analyses from the original project to the test project. + """ + new_sg_data = query(SG_ID_QUERY, {'project': target_project}) + + new_sg_map = {} + for s in new_sg_data.get('project').get('samples'): + sg_ids: list = [] + for sg in s.get('sequencingGroups'): + sg_ids.append(sg.get('id')) + new_sg_map[s.get('externalId')] = sg_ids + + for s in samples: + for sg in s['sequencingGroups']: + _existing_sg = _get_existing_sg( + existing_data, s.get('externalId'), sg.get('type') ) - new_sample_map[s['id']] = new_s_id - - seq_info = seq_info_by_s_id.get(s['id']) - if seq_info: - logger.info('Processing sequence entry') - new_meta = _copy_files_in_dict(seq_info.get('meta'), project) - logger.info('Creating sequence entry in test') - assayapi.create_assay( - assay_upsert=AssayUpsert( - sample_id=new_s_id, - meta=new_meta, - type=seq_info['type'], - external_ids=seq_info['external_ids'], - ), - ) + _existing_sgid = _existing_sg.get('id', None) + for analysis in sg['analyses']: + if analysis['type'] not in ['cram', 'gvcf']: + # Currently the create_test_subset script only handles crams or gvcf files. + continue + + _existing_analysis_id = None + _existing_analysis: dict = {} + if _existing_sgid: + _existing_analysis = _get_existing_analysis( + existing_data, + s['externalId'], + _existing_sgid, + analysis['type'], + ) + _existing_analysis_id = _existing_analysis.get('id', None) + if _existing_analysis_id: + am = AnalysisUpdateModel( + type=analysis['type'], + output=_copy_files_in_dict( + analysis['output'], + project, + (str(sg['id']), new_sg_map[s['externalId']][0]), + ), + status=AnalysisStatus(analysis['status'].lower()), + sequencing_group_ids=new_sg_map[s['externalId']], + meta=analysis['meta'], + ) + aapi.update_analysis( + analysis_id=_existing_analysis_id, + analysis_update_model=am, + ) + else: + am = Analysis( + type=analysis['type'], + output=_copy_files_in_dict( + analysis['output'], + project, + (str(sg['id']), new_sg_map[s['externalId']][0]), + ), + status=AnalysisStatus(analysis['status'].lower()), + sequencing_group_ids=new_sg_map[s['externalId']], + meta=analysis['meta'], + ) + + logger.info(f'Creating {analysis["type"]}analysis entry in test') + aapi.create_analysis(project=target_project, analysis=am) + + +def _get_existing_sample(data: dict, sample_id: str) -> dict: + for sample in data.get('project').get('samples'): + if sample.get('externalId') == sample_id: + return sample + + return None + + +def _get_existing_sg( + existing_data: dict, sample_id: str, sg_type: str = None, sg_id: str = None +) -> dict: + if not sg_type and not sg_id: + raise ValueError('Must provide sg_type or sg_id when getting exsisting sg') + sample = _get_existing_sample(existing_data, sample_id) + for sg in sample.get('sequencingGroups'): + if sg_id and sg.get('id') == sg_id: + return sg + if sg_type and sg.get('type') == sg_type: + return sg + + return None + + +def _get_existing_assay( + data: dict, sample_id: str, sg_id: str, assay_type: str +) -> dict: + sg = _get_existing_sg( + existing_data=data, + sample_id=sample_id, + sg_id=sg_id, + ) + for assay in sg.get('assays'): + if assay.get('type') == assay_type: + return assay - for a_type in ['cram', 'gvcf']: - analysis = analysis_by_sid_by_type[a_type].get(s['id']) - if analysis: - logger.info(f'Processing {a_type} analysis entry') - am = Analysis( - type=str(a_type), - output=_copy_files_in_dict( - analysis['output'], - project, - (str(s['id']), new_sample_map[s['id']]), - ), - status=AnalysisStatus(analysis['status']), - sample_ids=[new_sample_map[s['id']]], - meta=analysis['meta'], - ) - logger.info(f'Creating {a_type} analysis entry in test') - aapi.create_analysis(project=target_project, analysis=am) - logger.info(f'-') + return None + + +def _get_existing_analysis( + data: dict, sample_id: str, sg_id: str, analysis_type: str +) -> dict: + sg = _get_existing_sg(existing_data=data, sample_id=sample_id, sg_id=sg_id) + for analysis in sg.get('analyses'): + if analysis.get('type') == analysis_type: + return analysis + return None + + +def _get_sids_for_families( + project: str, + families_n: int, + additional_families, +) -> list[str]: + """Returns specific sequencing groups to be included in the test project.""" + + included_sids: list = [] + _num_families_to_subset: int = None + _randomly_selected_families: list = [] + + # Case 1: If neither families_n nor _additional_families + if not families_n and not additional_families: + return included_sids + + # Case 2: If families_n but not _additional_families + if families_n and not additional_families: + _num_families_to_subset = families_n + + # Case 3: If both families_n and _additional_families + if families_n and additional_families: + _num_families_to_subset = families_n - len(additional_families) + + family_sgid_output = query(QUERY_FAMILY_SGID, {'project': project}) + + # 1. Remove the families in _families_to_subset + all_family_sgids = family_sgid_output.get('project').get('families') + _filtered_family_sgids = [ + fam for fam in all_family_sgids if fam['externalId'] not in additional_families + ] + _user_input_families = [ + fam for fam in all_family_sgids if fam['externalId'] in additional_families + ] + + # TODO: Replace this with the nice script that randomly selects better :) + # 2. Randomly select _num_families_to_subset from the remaining families + if _num_families_to_subset: + _randomly_selected_families = random.sample( + _filtered_family_sgids, _num_families_to_subset + ) + + # 3. Combine the families in _families_to_subset with the randomly selected families & return sequencing group ids + + _all_families_to_subset = _randomly_selected_families + _user_input_families + + for fam in _all_families_to_subset: + for participant in fam['participants']: + for sample in participant['samples']: + included_sids.append(sample['id']) + + return included_sids def transfer_families( @@ -391,14 +603,10 @@ def transfer_ped( def transfer_participants( - initial_project: str, target_project: str, participant_ids: list[int] -) -> list[str]: + target_project: str, + participant_data, +) -> dict[str, int]: """Transfers relevant participants between projects""" - - current_participants = papi.get_participants( - initial_project, - body_get_participants={'internal_participant_ids': participant_ids}, - ) existing_participants = papi.get_participants(target_project) target_project_epids = [ @@ -406,11 +614,19 @@ def transfer_participants( ] participants_to_transfer = [] - for participant in current_participants: - if participant['external_id'] not in target_project_epids: + for participant in participant_data: + if participant['externalId'] not in target_project_epids: # Participants with id field will be updated & those without will be inserted del participant['id'] - transfer_participant = {k: v for k, v in participant.items() if v is not None} + # transfer_participant = {k: v for k, v in participant.items() if v is not None} + transfer_participant = { + 'external_id': participant['externalId'], + 'meta': participant.get('meta') or {}, + 'karyotype': participant.get('karyotype'), + 'reported_gender': participant.get('reportedGender'), + 'reported_sex': participant.get('reportedSex'), + 'id': participant.get('id'), + } # Participants are being created before the samples are, so this will be empty for now. transfer_participant['samples'] = [] participants_to_transfer.append(transfer_participant) @@ -418,50 +634,14 @@ def transfer_participants( upserted_participants = papi.upsert_participants( target_project, participant_upsert=participants_to_transfer ) - return list(upserted_participants.keys()) - - -def get_map_ipid_esid( - project: str, target_project: str, external_participant_ids: list[str] -) -> dict[str, str]: - """Intermediate steps to determine the mapping of esid to ipid - Acronyms - ep : external participant id - ip : internal participant id - es : external sample id - is : internal sample id - """ - - # External PID: Internal PID - ep_ip_map = papi.get_participant_id_map_by_external_ids( - target_project, request_body=external_participant_ids - ) - - # External PID : Internal SID - ep_is_map = papi.get_external_participant_id_to_internal_sample_id(project) - - # Internal PID : Internal SID - ip_is_map = [] - for ep_is_pair in ep_is_map: - if ep_is_pair[0] in ep_ip_map: - ep_is_pair[0] = ep_ip_map[ep_is_pair[0]] - ip_is_map.append(ep_is_pair) - # Internal PID : External SID - is_es_map = sapi.get_all_sample_id_map_by_internal(project) + external_to_internal_participant_id_map: dict[str, int] = {} - ip_es_map = [] - for ip_is_pair in ip_is_map: - samples_per_participant = [ip_is_pair[0]] - for isid in ip_is_pair[1:]: - if isid in is_es_map: - samples_per_participant.append(is_es_map[isid]) - ip_es_map.append(samples_per_participant) - - # External SID : Internal PID (Normalised) - external_sample_internal_participant_map = _normalise_map(ip_es_map) - - return external_sample_internal_participant_map + for participant in upserted_participants: + external_to_internal_participant_id_map[ + participant['external_id'] + ] = participant['id'] + return external_to_internal_participant_id_map def get_samples_for_families(project: str, additional_families: list[str]) -> list[str]: @@ -685,38 +865,6 @@ def _pretty_format_samples(samples: list[dict[str, str]]) -> str: return ', '.join(f"{s['id']}/{s['external_id']}" for s in samples) -def _process_existing_test_samples( - test_project: str, samples: list[dict[str, str]] -) -> dict[str, dict[str, str]]: - """ - Removes samples that need to be removed and returns those that need to be kept - """ - test_samples = sapi.get_samples( - body_get_samples={ - 'project_ids': [test_project], - 'active': True, - } - ) - external_ids = [s['external_id'] for s in samples] - test_samples_to_remove = [ - s for s in test_samples if s['external_id'] not in external_ids - ] - test_samples_to_keep = [s for s in test_samples if s['external_id'] in external_ids] - if test_samples_to_remove: - logger.info( - f'Removing test samples: {_pretty_format_samples(test_samples_to_remove)}' - ) - for s in test_samples_to_remove: - sapi.update_sample(s['id'], SampleUpsert(active=False)) - - if test_samples_to_keep: - logger.info( - f'Test samples already exist: {_pretty_format_samples(test_samples_to_keep)}' - ) - - return {s['external_id']: s for s in test_samples_to_keep} - - def file_exists(path: str) -> bool: """ Check if the object exists, where the object can be: @@ -734,80 +882,6 @@ def file_exists(path: str) -> bool: return os.path.exists(path) -def _get_sgs_for_families( - project: str, - families_n: int, - additional_families, -) -> list[str]: - """Returns specific sequencing groups to be included in the test project.""" - - included_sgids: list = [] - _num_families_to_subset: int = None - _randomly_selected_families: list = [] - - # Case 1: If neither families_n nor _additional_families - if not families_n and not additional_families: - return included_sgids - - # Case 2: If families_n but not _additional_families - if families_n and not additional_families: - _num_families_to_subset = families_n - - # Case 3: If both families_n and _additional_families - if families_n and additional_families: - _num_families_to_subset = families_n - len(additional_families) - - QUERY_FAMILY_SGID = gql( - """ - query FamilyQuery($project: String!) { - project(name: $project) { - families { - id - externalId - participants { - samples { - sequencingGroups { - id - } - } - } - } - - } - } - """ - ) - - family_sgid_output = query(QUERY_FAMILY_SGID, {'project': project}) - - # 1. Remove the families in _families_to_subset - all_family_sgids = family_sgid_output.get('project').get('families') - _filtered_family_sgids = [ - fam for fam in all_family_sgids if fam['externalId'] not in additional_families - ] - _user_input_families = [ - fam for fam in all_family_sgids if fam['externalId'] in additional_families - ] - - # 2. Randomly select _num_families_to_subset from the remaining families - if _num_families_to_subset: - _randomly_selected_families = random.sample( - _filtered_family_sgids, _num_families_to_subset - ) - - # 3. Combine the families in _families_to_subset with the randomly selected families & return sequencing group ids - - _all_families_to_subset = _randomly_selected_families + _user_input_families - - for fam in _all_families_to_subset: - for participant in fam['participants']: - for sample in participant['samples']: - for sg in sample['sequencingGroups']: - included_sgids.append(sg['id']) - - return included_sgids - - if __name__ == '__main__': # pylint: disable=no-value-for-parameter main() From b04f6ae7108a82798eee310beb81f17c76f55da2 Mon Sep 17 00:00:00 2001 From: vivbak Date: Thu, 20 Jul 2023 09:51:21 +1000 Subject: [PATCH 03/10] Remove interactivity. Remove validation - As we run scripts via AR, the interactivity never makes sense. - Most of the validation is inaccurate now, as those use cases have been handled --- scripts/create_test_subset.py | 38 +++-------------------------------- 1 file changed, 3 insertions(+), 35 deletions(-) diff --git a/scripts/create_test_subset.py b/scripts/create_test_subset.py index 0b2e6a61d..3f034d9a2 100755 --- a/scripts/create_test_subset.py +++ b/scripts/create_test_subset.py @@ -204,13 +204,6 @@ This is in addition to the number of families specified in --families and the number of samples specified in -n""", ) -@click.option( - '--noninteractive', - 'noninteractive', - is_flag=True, - default=False, - help='Skip interactive confirmation', -) def main( project: str, samples_n: Optional[int], @@ -218,14 +211,13 @@ def main( skip_ped: Optional[bool] = True, additional_families: Optional[tuple[str]] = None, additional_samples: Optional[tuple[str]] = None, - noninteractive: Optional[bool] = False, ): """ Script creates a test subset for a given project. A new project with a prefix -test is created, and for any files in sample/meta, sequence/meta, or analysis/output a copy in the -test namespace is created. """ - samples_n, families_n = _validate_opts(samples_n, families_n, noninteractive) + samples_n, families_n = _validate_opts(samples_n, families_n) _additional_families: list[str] = list(additional_families) _additional_samples: list[str] = list(additional_samples) @@ -713,11 +705,9 @@ def _normalise_map(unformatted_map: list[list[str]]) -> dict[str, str]: def _validate_opts( - samples_n: int, families_n: int, noninteractive: bool + samples_n: int, families_n: int ) -> tuple[Optional[int], Optional[int]]: - if samples_n is not None and families_n is not None: - raise click.BadParameter('Please specify only one of --samples or --families') - + """Validates the options passed to the script""" if samples_n is None and families_n is None: samples_n = DEFAULT_SAMPLES_N logger.info( @@ -725,28 +715,6 @@ def _validate_opts( f'{samples_n} samples' ) - if families_n is not None and families_n < 1: - raise click.BadParameter('Please specify --families higher than 0') - - if (families_n is not None and families_n >= 30) and not noninteractive: - resp = str( - input( - f'You requested a subset of {families_n} families. ' - f'Please confirm (y): ' - ) - ) - if resp.lower() != 'y': - raise SystemExit() - - if (samples_n is not None and samples_n >= 100) and not noninteractive: - resp = str( - input( - f'You requested a subset of {samples_n} samples. ' - f'Please confirm (y): ' - ) - ) - if resp.lower() != 'y': - raise SystemExit() return samples_n, families_n From 8c7d0ff2917f0d1b6c33663e168498aeee8ad866 Mon Sep 17 00:00:00 2001 From: vivbak Date: Fri, 21 Jul 2023 11:36:53 +1000 Subject: [PATCH 04/10] Fixes + Cleanup --- scripts/create_test_subset.py | 105 ++++++++++++++++++++-------------- 1 file changed, 61 insertions(+), 44 deletions(-) diff --git a/scripts/create_test_subset.py b/scripts/create_test_subset.py index 3f034d9a2..4551d6c2f 100755 --- a/scripts/create_test_subset.py +++ b/scripts/create_test_subset.py @@ -52,22 +52,6 @@ DEFAULT_SAMPLES_N = 10 -SG_ID_QUERY = gql( - """ - query getSGIds($project: String!) { - project(name: $project) { - samples{ - id - externalId - sequencingGroups { - id - } - } - } - } - """ -) - QUERY_ALL_DATA = gql( """ query getAllData($project: String!, $sids: [String!]) { @@ -157,6 +141,33 @@ """ ) +SG_ID_QUERY = gql( + """ + query getSGIds($project: String!) { + project(name: $project) { + samples{ + id + externalId + sequencingGroups { + id + } + } + } + } + """ +) + +PARTICIPANT_QUERY = """ + query ($project: String!) { + project (externalId: $project) { + participants { + id + externalId + } + } + } + """ + @click.command() @click.option( @@ -221,6 +232,7 @@ def main( _additional_families: list[str] = list(additional_families) _additional_samples: list[str] = list(additional_samples) + # 1. Determine the sids to be moved into -test. specific_sids = _get_sids_for_families( project, families_n, @@ -233,29 +245,26 @@ def main( specific_sids = specific_sids + _additional_samples - # 1. Query all the SGIDS + # 2. Get all sids in project. sid_output = query(SG_ID_QUERY, variables={'project': project}) all_sids = [sid['id'] for sid in sid_output.get('project').get('samples')] - # 2. Subtract the specific_sgs from all the sgs + # 3. Subtract the specific_sgs from all the sgs sgids_after_inclusions = list(set(all_sids) - set(specific_sids)) - # 3. Randomly select from the remaining sgs + # 4. Randomly select from the remaining sgs random_sgs: list[str] = [] random.seed(42) # for reproducibility if (samples_n - len(specific_sids)) > 0: random_sgs = random.sample( sgids_after_inclusions, samples_n - len(specific_sids) ) - # 4. Add the specific_sgs to the randomly selected sgs + # 5. Add the specific_sgs to the randomly selected sgs final_subset_sids = specific_sids + random_sgs - # 5. Query all the samples from the selected sgs + # 6. Query all the samples from the selected sgs original_project_subset_data = query( QUERY_ALL_DATA, {'project': project, 'sids': final_subset_sids} ) - # Populating test project - target_project = project + '-test' - # Pull Participant Data participant_data = [] participant_ids: list = [] @@ -265,6 +274,9 @@ def main( participant_data.append(participant) participant_ids.append(participant.get('externalId')) + # Populating test project + target_project = project + '-test' + # Parse Families & Participants if skip_ped: # If no family data is available, only the participants should be transferred. @@ -275,7 +287,7 @@ def main( else: family_ids = transfer_families(project, target_project, participant_ids) - transfer_ped(project, target_project, family_ids) + upserted_participant_map = transfer_ped(project, target_project, family_ids) existing_data = query(EXISTING_DATA_QUERY, {'project': target_project}) @@ -304,7 +316,7 @@ def transfer_samples_sgs_assays( _existing_sg = _get_existing_sg( existing_data, s.get('externalId'), sg.get('type') ) - _existing_sgid = _existing_sg.get('id', None) + _existing_sgid = _existing_sg.get('id') if _existing_sg else None for assay in sg.get('assays'): _existing_assay: dict[str, str] = {} if _existing_sgid: @@ -314,9 +326,12 @@ def transfer_samples_sgs_assays( _existing_sgid, assay.get('type'), ) + existing_assay_id = ( + _existing_assay.get('id') if _existing_assay else None + ) assay_upsert = AssayUpsert( type=assay.get('type'), - id=_existing_assay.get('id', None), + id=existing_assay_id, external_ids=assay.get('externalIds') or {}, # sample_id=self.s, meta=assay.get('meta'), @@ -380,13 +395,12 @@ def transfer_analyses( _existing_sg = _get_existing_sg( existing_data, s.get('externalId'), sg.get('type') ) - _existing_sgid = _existing_sg.get('id', None) + _existing_sgid = _existing_sg.get('id') if _existing_sg else None for analysis in sg['analyses']: if analysis['type'] not in ['cram', 'gvcf']: # Currently the create_test_subset script only handles crams or gvcf files. continue - _existing_analysis_id = None _existing_analysis: dict = {} if _existing_sgid: _existing_analysis = _get_existing_analysis( @@ -395,7 +409,9 @@ def transfer_analyses( _existing_sgid, analysis['type'], ) - _existing_analysis_id = _existing_analysis.get('id', None) + _existing_analysis_id = ( + _existing_analysis.get('id') if _existing_analysis else None + ) if _existing_analysis_id: am = AnalysisUpdateModel( type=analysis['type'], @@ -443,11 +459,12 @@ def _get_existing_sg( if not sg_type and not sg_id: raise ValueError('Must provide sg_type or sg_id when getting exsisting sg') sample = _get_existing_sample(existing_data, sample_id) - for sg in sample.get('sequencingGroups'): - if sg_id and sg.get('id') == sg_id: - return sg - if sg_type and sg.get('type') == sg_type: - return sg + if sample: + for sg in sample.get('sequencingGroups'): + if sg_id and sg.get('id') == sg_id: + return sg + if sg_type and sg.get('type') == sg_type: + return sg return None @@ -564,20 +581,13 @@ def transfer_families( def transfer_ped( initial_project: str, target_project: str, family_ids: list[int] -) -> list[str]: +) -> dict[str, int]: """Pull pedigree from the input project, and copy to target_project""" ped_tsv = fapi.get_pedigree( initial_project, export_type='tsv', internal_family_ids=family_ids, ) - ped_json = fapi.get_pedigree( - initial_project, - export_type='json', - internal_family_ids=family_ids, - ) - - external_participant_ids = [ped['individual_id'] for ped in ped_json] tmp_ped_tsv = 'tmp_ped.tsv' # Work-around as import_pedigree takes a file. with open(tmp_ped_tsv, 'w') as tmp_ped: @@ -591,7 +601,14 @@ def transfer_ped( create_missing_participants=True, ) - return external_participant_ids + # Get map of external participant id to internal + participant_output = query(PARTICIPANT_QUERY, {'project': target_project}) + participant_map = { + participant['externalId']: participant['id'] + for participant in participant_output.get('project').get('participants') + } + + return participant_map def transfer_participants( From 14c0a3ab11e31839004b766203d319905281a0d3 Mon Sep 17 00:00:00 2001 From: Matt Welland Date: Mon, 25 Sep 2023 14:58:35 +1000 Subject: [PATCH 05/10] my code don't jiggle jiggle, it folds (#552) Co-authored-by: vivbak --- scripts/create_test_subset.py | 463 ++++++++++++---------------------- 1 file changed, 164 insertions(+), 299 deletions(-) diff --git a/scripts/create_test_subset.py b/scripts/create_test_subset.py index cf64b324b..b1f2993e7 100755 --- a/scripts/create_test_subset.py +++ b/scripts/create_test_subset.py @@ -1,9 +1,5 @@ #!/usr/bin/env python3 -# type: ignore -# pylint: skip-file - - -# # pylint: disable=too-many-instance-attributes,too-many-locals,unused-argument,wrong-import-order,unused-argument,too-many-arguments +# pylint: disable=too-many-instance-attributes,too-many-locals """ Example Invocation @@ -15,18 +11,18 @@ This example will populate acute-care-test with the metamist data for 4 families. """ -from typing import Optional, Counter as CounterType import csv import logging import os import random import subprocess +from argparse import ArgumentParser from collections import Counter -import click from google.cloud import storage -from metamist.apis import AnalysisApi, AssayApi, FamilyApi, ParticipantApi, SampleApi +from metamist.apis import AnalysisApi, AssayApi, SampleApi, FamilyApi, ParticipantApi +from metamist.graphql import gql, query from metamist.models import ( AssayUpsert, SampleUpsert, @@ -36,7 +32,6 @@ SequencingGroupUpsert, ) -from metamist.graphql import gql, query logger = logging.getLogger(__file__) logging.basicConfig(format='%(levelname)s (%(name)s %(lineno)s): %(message)s') @@ -155,7 +150,8 @@ """ ) -PARTICIPANT_QUERY = """ +PARTICIPANT_QUERY = gql( + """ query ($project: String!) { project (externalId: $project) { participants { @@ -165,110 +161,45 @@ } } """ +) -@click.command() -@click.option( - '--project', - required=True, - help='The metamist project ($DATASET)', -) -@click.option( - '-n', - '--samples', - 'samples_n', - type=int, - help='Number of samples to subset', -) -@click.option( - '--families', - 'families_n', - type=int, - help='Minimal number of families to include', -) -# Flag to be used when there isn't available pedigree/family information. -@click.option( - '--skip-ped', - 'skip_ped', - is_flag=True, - default=False, - help='Skip transferring pedigree/family information', -) -@click.option( - '--add-family', - 'additional_families', - type=str, - multiple=True, - help="""Additional families to include. - All samples from these fams will be included. - This is in addition to the number of families specified in - --families and the number of samples specified in -n""", -) -@click.option( - '--add-sample', - 'additional_samples', - type=str, - multiple=True, - help="""Additional samples to include. - This is in addition to the number of families specified in - --families and the number of samples specified in -n""", -) -@click.option( - '--noninteractive', - 'noninteractive', - is_flag=True, - default=False, - help='Skip interactive confirmation', -) def main( project: str, - samples_n: Optional[int], - families_n: Optional[int], - skip_ped: Optional[bool] = True, - additional_families: Optional[tuple[str]] = None, - additional_samples: Optional[tuple[str]] = None, - noninteractive: Optional[bool] = False, + samples_n: int, + families_n: int, + additional_families: set[str], + additional_samples: set[str], + skip_ped: bool = True, ): """ Script creates a test subset for a given project. A new project with a prefix -test is created, and for any files in sample/meta, sequence/meta, or analysis/output a copy in the -test namespace is created. """ - samples_n, families_n = _validate_opts(samples_n, families_n, noninteractive) - _additional_families: list[str] = list(additional_families) - _additional_samples: list[str] = list(additional_samples) - - # 1. Determine the sids to be moved into -test. - specific_sids = _get_sids_for_families( - project, - families_n, - _additional_families, - ) - if not samples_n and not families_n: - samples_n = DEFAULT_SAMPLES_N - if not samples_n and families_n: - samples_n = 0 - specific_sids = specific_sids + _additional_samples + if not any([additional_families, additional_samples, samples_n, families_n]): + raise ValueError('Come on, what exactly are you asking for?') + + # for reproducibility + random.seed(42) + + # 1. Find and SG IDs to be moved by Family ID -test. + if families_n or additional_families: + additional_samples.update( + get_sids_for_families(project, families_n, additional_families) + ) # 2. Get all sids in project. sid_output = query(SG_ID_QUERY, variables={'project': project}) - all_sids = [sid['id'] for sid in sid_output.get('project').get('samples')] - - # 3. Subtract the specific_sgs from all the sgs - sgids_after_inclusions = list(set(all_sids) - set(specific_sids)) - # 4. Randomly select from the remaining sgs - random_sgs: list[str] = [] - random.seed(42) # for reproducibility - if (samples_n - len(specific_sids)) > 0: - random_sgs = random.sample( - sgids_after_inclusions, samples_n - len(specific_sids) - ) - # 5. Add the specific_sgs to the randomly selected sgs - final_subset_sids = specific_sids + random_sgs - # 6. Query all the samples from the selected sgs + all_sids = {sid['id'] for sid in sid_output.get('project').get('samples')} + + # 3. Randomly select from the remaining sgs + additional_samples.update(random.sample(all_sids - additional_samples, samples_n)) + + # 4. Query all the samples from the selected sgs original_project_subset_data = query( - QUERY_ALL_DATA, {'project': project, 'sids': final_subset_sids} + QUERY_ALL_DATA, {'project': project, 'sids': list(additional_samples)} ) # Pull Participant Data @@ -319,14 +250,14 @@ def transfer_samples_sgs_assays( sample_sgs: list[SequencingGroupUpsert] = [] for sg in s.get('sequencingGroups'): sg_assays: list[AssayUpsert] = [] - _existing_sg = _get_existing_sg( + existing_sg = get_existing_sg( existing_data, s.get('externalId'), sg.get('type') ) - _existing_sgid = _existing_sg.get('id') if _existing_sg else None + _existing_sgid = existing_sg.get('id') if existing_sg else None for assay in sg.get('assays'): _existing_assay: dict[str, str] = {} if _existing_sgid: - _existing_assay = _get_existing_assay( + _existing_assay = get_existing_assay( existing_data, s.get('externalId'), _existing_sgid, @@ -354,23 +285,23 @@ def transfer_samples_sgs_assays( ) sample_sgs.append(sg_upsert) - _sample_type = None if s['type'] == 'None' else s['type'] - _existing_sid: str = None - _existing_sample = _get_existing_sample(existing_data, s['externalId']) - if _existing_sample: - _existing_sid = _existing_sample['id'] + sample_type = None if s['type'] == 'None' else s['type'] + existing_sid: str | None = None + existing_sample = get_existing_sample(existing_data, s['externalId']) + if existing_sample: + existing_sid = existing_sample['id'] - _existing_pid: int = None + existing_pid: int | None = None if s['participant']: - _existing_pid = upserted_participant_map[s['participant']['externalId']] + existing_pid = upserted_participant_map[s['participant']['externalId']] sample_upsert = SampleUpsert( external_id=s['externalId'], - type=_sample_type or None, - meta=(_copy_files_in_dict(s['meta'], project) or {}), - participant_id=_existing_pid, + type=sample_type or None, + meta=(copy_files_in_dict(s['meta'], project) or {}), + participant_id=existing_pid, sequencing_groups=sample_sgs, - id=_existing_sid, + id=existing_sid, ) logger.info(f'Processing sample {s["id"]}') @@ -398,30 +329,27 @@ def transfer_analyses( for s in samples: for sg in s['sequencingGroups']: - _existing_sg = _get_existing_sg( + existing_sg = get_existing_sg( existing_data, s.get('externalId'), sg.get('type') ) - _existing_sgid = _existing_sg.get('id') if _existing_sg else None + existing_sgid = existing_sg.get('id') if existing_sg else None for analysis in sg['analyses']: if analysis['type'] not in ['cram', 'gvcf']: # Currently the create_test_subset script only handles crams or gvcf files. continue - _existing_analysis: dict = {} - if _existing_sgid: - _existing_analysis = _get_existing_analysis( - existing_data, - s['externalId'], - _existing_sgid, - analysis['type'], + existing_analysis: dict = {} + if existing_sgid: + existing_analysis = get_existing_analysis( + existing_data, s['externalId'], existing_sgid, analysis['type'] ) - _existing_analysis_id = ( - _existing_analysis.get('id') if _existing_analysis else None + existing_analysis_id = ( + existing_analysis.get('id') if existing_analysis else None ) - if _existing_analysis_id: + if existing_analysis_id: am = AnalysisUpdateModel( type=analysis['type'], - output=_copy_files_in_dict( + output=copy_files_in_dict( analysis['output'], project, (str(sg['id']), new_sg_map[s['externalId']][0]), @@ -431,13 +359,13 @@ def transfer_analyses( meta=analysis['meta'], ) aapi.update_analysis( - analysis_id=_existing_analysis_id, + analysis_id=existing_analysis_id, analysis_update_model=am, ) else: am = Analysis( type=analysis['type'], - output=_copy_files_in_dict( + output=copy_files_in_dict( analysis['output'], project, (str(sg['id']), new_sg_map[s['externalId']][0]), @@ -451,21 +379,31 @@ def transfer_analyses( aapi.create_analysis(project=target_project, analysis=am) -def _get_existing_sample(data: dict, sample_id: str) -> dict: - for sample in data.get('project').get('samples'): +def get_existing_sample(data: dict, sample_id: str) -> dict | None: + """ + Get the existing sample object for this ID + Returns: + The Sample dictionary, or None if unmatched + """ + for sample in data.get('project', {}).get('samples', []): if sample.get('externalId') == sample_id: return sample return None -def _get_existing_sg( +def get_existing_sg( existing_data: dict, sample_id: str, sg_type: str = None, sg_id: str = None -) -> dict: +) -> dict | None: + """ + Find a SG ID in the main data based on a sample ID + Match either on CPG ID or type (exome/genome) + Returns: + The SG Data, or None if no match is found + """ if not sg_type and not sg_id: raise ValueError('Must provide sg_type or sg_id when getting exsisting sg') - sample = _get_existing_sample(existing_data, sample_id) - if sample: + if sample := get_existing_sample(existing_data, sample_id): for sg in sample.get('sequencingGroups'): if sg_id and sg.get('id') == sg_id: return sg @@ -475,80 +413,71 @@ def _get_existing_sg( return None -def _get_existing_assay( +def get_existing_assay( data: dict, sample_id: str, sg_id: str, assay_type: str -) -> dict: - sg = _get_existing_sg( - existing_data=data, - sample_id=sample_id, - sg_id=sg_id, - ) - for assay in sg.get('assays'): - if assay.get('type') == assay_type: - return assay +) -> dict | None: + """ + Find assay in main data for this SGID + Returns: + The Assay Data, or None if no match is found + """ + if sg := get_existing_sg(existing_data=data, sample_id=sample_id, sg_id=sg_id): + for assay in sg.get('assays', []): + if assay.get('type') == assay_type: + return assay return None -def _get_existing_analysis( +def get_existing_analysis( data: dict, sample_id: str, sg_id: str, analysis_type: str -) -> dict: - sg = _get_existing_sg(existing_data=data, sample_id=sample_id, sg_id=sg_id) - for analysis in sg.get('analyses'): - if analysis.get('type') == analysis_type: - return analysis +) -> dict | None: + """ + Find the existing SG for this sample, then identify any relevant analysis objs + Returns: + an analysis dict, or None if the right type isn't found + """ + if sg := get_existing_sg(existing_data=data, sample_id=sample_id, sg_id=sg_id): + for analysis in sg.get('analyses', []): + if analysis.get('type') == analysis_type: + return analysis return None -def _get_sids_for_families( - project: str, - families_n: int, - additional_families, -) -> list[str]: +def get_sids_for_families( + project: str, families_n: int, additional_families: set[str] +) -> set[str]: """Returns specific sequencing groups to be included in the test project.""" - included_sids: list = [] - _num_families_to_subset: int = None - _randomly_selected_families: list = [] - - # Case 1: If neither families_n nor _additional_families - if not families_n and not additional_families: - return included_sids - - # Case 2: If families_n but not _additional_families - if families_n and not additional_families: - _num_families_to_subset = families_n - - # Case 3: If both families_n and _additional_families - if families_n and additional_families: - _num_families_to_subset = families_n - len(additional_families) - family_sgid_output = query(QUERY_FAMILY_SGID, {'project': project}) - # 1. Remove the families in _families_to_subset - all_family_sgids = family_sgid_output.get('project').get('families') - _filtered_family_sgids = [ - fam for fam in all_family_sgids if fam['externalId'] not in additional_families - ] - _user_input_families = [ + all_family_sgids = family_sgid_output.get('project', {}).get('families', []) + assert all_family_sgids, 'No families returned in GQL result' + + # 1. Remove the specifically requested families + user_input_families = [ fam for fam in all_family_sgids if fam['externalId'] in additional_families ] # TODO: Replace this with the nice script that randomly selects better :) - # 2. Randomly select _num_families_to_subset from the remaining families - if _num_families_to_subset: - _randomly_selected_families = random.sample( - _filtered_family_sgids, _num_families_to_subset + # 2. Randomly select from the remaining families (families_n can be 0) + user_input_families.extend( + random.sample( + [ + fam + for fam in all_family_sgids + if fam['externalId'] not in additional_families + ], + families_n, ) + ) - # 3. Combine the families in _families_to_subset with the randomly selected families & return sequencing group ids - - _all_families_to_subset = _randomly_selected_families + _user_input_families - - for fam in _all_families_to_subset: + # 3. Pull SGs from random + specific families + included_sids: set[str] = set() + for fam in user_input_families: for participant in fam['participants']: for sample in participant['samples']: - included_sids.append(sample['id']) + included_sids.add(sample['id']) return included_sids @@ -641,9 +570,9 @@ def transfer_participants( 'reported_gender': participant.get('reportedGender'), 'reported_sex': participant.get('reportedSex'), 'id': participant.get('id'), + 'samples': [], } # Participants are being created before the samples are, so this will be empty for now. - transfer_participant['samples'] = [] participants_to_transfer.append(transfer_participant) upserted_participants = papi.upsert_participants( @@ -659,110 +588,10 @@ def transfer_participants( return external_to_internal_participant_id_map -def get_samples_for_families(project: str, additional_families: list[str]) -> list[str]: - """Returns the samples that belong to a list of families""" - - full_pedigree = fapi.get_pedigree( - project=project, - replace_with_participant_external_ids=False, - replace_with_family_external_ids=True, - ) - - ipids = [ - family['individual_id'] - for family in full_pedigree - if family['family_id'] in additional_families - ] - - sample_objects = sapi.get_samples( - body_get_samples={ - 'project_ids': [project], - 'participant_ids': ipids, - 'active': True, - } - ) - - samples: list[str] = [sample['id'] for sample in sample_objects] - - return samples - - -def get_fams_for_samples( - project: str, - additional_samples: Optional[list[str]] = None, -) -> list[str]: - """Returns the families that a list of samples belong to""" - sample_objects = sapi.get_samples( - body_get_samples={ - 'project_ids': [project], - 'sample_ids': additional_samples, - 'active': True, - } - ) - - pids = [sample['participant_id'] for sample in sample_objects] - full_pedigree = fapi.get_pedigree( - project=project, - replace_with_participant_external_ids=False, - replace_with_family_external_ids=True, - ) - - fams: set[str] = { - fam['family_id'] for fam in full_pedigree if str(fam['individual_id']) in pids - } - - return list(fams) - - -def _normalise_map(unformatted_map: list[list[str]]) -> dict[str, str]: - """Input format: [[value1,key1,key2],[value2,key4]] - Output format: {key1:value1, key2: value1, key3:value2}""" - - normalised_map = {} - for group in unformatted_map: - value = group[0] - for key in group[1:]: - normalised_map[key] = value - - return normalised_map - - -def _validate_opts( - samples_n: int, families_n: int, noninteractive: bool -) -> tuple[Optional[int], Optional[int]]: - """Validates the options passed to the script""" - if samples_n is None and families_n is None: - samples_n = DEFAULT_SAMPLES_N - logger.info( - f'Neither --samples nor --families specified, defaulting to selecting ' - f'{samples_n} samples' - ) - - return samples_n, families_n - - -def _print_fam_stats(families: list[dict[str, str]]): - family_sizes = Counter([fam['family_id'] for fam in families]) - fam_by_size: CounterType[int] = Counter() - # determine number of singles, duos, trios, etc - for fam in family_sizes: - fam_by_size[family_sizes[fam]] += 1 - for fam_size in sorted(fam_by_size): - if fam_size == 1: - label = 'singles' - elif fam_size == 2: - label = 'duos' - elif fam_size == 3: - label = 'trios' - else: - label = f'{fam_size} members' - logger.info(f' {label}: {fam_by_size[fam_size]}') - - -def _get_random_families( +def get_random_families( families: list[dict[str, str]], families_n: int, - include_single_person_families: Optional[bool] = False, + include_single_person_families: bool = False, ) -> list[str]: """Obtains a subset of families, that are a little less random. By default single-person families are discarded. @@ -807,7 +636,7 @@ def _get_random_families( return returned_families -def _copy_files_in_dict(d, dataset: str, sid_replacement: tuple[str, str] = None): +def copy_files_in_dict(d, dataset: str, sid_replacement: tuple[str, str] = None): """ Replaces all `gs://cpg-{project}-main*/` paths into `gs://cpg-{project}-test*/` and creates copies if needed @@ -846,16 +675,12 @@ def _copy_files_in_dict(d, dataset: str, sid_replacement: tuple[str, str] = None subprocess.run(cmd, check=False, shell=True) return new_path if isinstance(d, list): - return [_copy_files_in_dict(x, dataset) for x in d] + return [copy_files_in_dict(x, dataset) for x in d] if isinstance(d, dict): - return {k: _copy_files_in_dict(v, dataset) for k, v in d.items()} + return {k: copy_files_in_dict(v, dataset) for k, v in d.items()} return d -def _pretty_format_samples(samples: list[dict[str, str]]) -> str: - return ', '.join(f"{s['id']}/{s['external_id']}" for s in samples) - - def file_exists(path: str) -> bool: """ Check if the object exists, where the object can be: @@ -874,5 +699,45 @@ def file_exists(path: str) -> bool: if __name__ == '__main__': - # pylint: disable=no-value-for-parameter - main() + parser = ArgumentParser(description='Argument parser for subset generator') + parser.add_argument( + '--project', required=True, help='The sample-metadata project ($DATASET)' + ) + parser.add_argument('-n', type=int, help='# Random Samples to copy', default=0) + parser.add_argument('-f', type=int, help='# Random families to copy', default=0) + # Flag to be used when there isn't available pedigree/family information. + parser.add_argument( + '--skip-ped', + action='store_true', + help='Skip transferring pedigree/family information', + ) + parser.add_argument( + '--families', + nargs='+', + help='Additional families to include.', + type=str, + default={}, + ) + parser.add_argument( + '--samples', + nargs='+', + help='Additional samples to include.', + type=str, + default={}, + ) + parser.add_argument( + '--noninteractive', action='store_true', help='Skip interactive confirmation' + ) + args, fail = parser.parse_known_args() + if fail: + parser.print_help() + raise AttributeError(f'Invalid arguments: {fail}') + + main( + project=args.project, + samples_n=args.n, + families_n=args.f, + additional_samples=set(args.samples), + additional_families=set(args.families), + skip_ped=args.skip_ped, + ) From 81cc06eec09871f3f09ee6230b1cfbda2f796f33 Mon Sep 17 00:00:00 2001 From: vivbak Date: Mon, 25 Sep 2023 15:23:06 +1000 Subject: [PATCH 06/10] respond to feedback --- scripts/create_test_subset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/create_test_subset.py b/scripts/create_test_subset.py index b1f2993e7..573f498f4 100755 --- a/scripts/create_test_subset.py +++ b/scripts/create_test_subset.py @@ -246,6 +246,7 @@ def transfer_samples_sgs_assays( Transfer samples, sequencing groups, and assays from the original project to the test project. """ + logging.info('Transferring samples, sequencing groups, and assays') for s in samples: sample_sgs: list[SequencingGroupUpsert] = [] for sg in s.get('sequencingGroups'): @@ -562,7 +563,6 @@ def transfer_participants( if participant['externalId'] not in target_project_epids: # Participants with id field will be updated & those without will be inserted del participant['id'] - # transfer_participant = {k: v for k, v in participant.items() if v is not None} transfer_participant = { 'external_id': participant['externalId'], 'meta': participant.get('meta') or {}, From 7380254b7c8cabdc7d8586ace7eb63339d023bec Mon Sep 17 00:00:00 2001 From: vivbak Date: Tue, 26 Sep 2023 09:33:55 +1000 Subject: [PATCH 07/10] Fixes haha --- scripts/create_test_subset.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/scripts/create_test_subset.py b/scripts/create_test_subset.py index 573f498f4..818c3e46b 100755 --- a/scripts/create_test_subset.py +++ b/scripts/create_test_subset.py @@ -153,7 +153,7 @@ PARTICIPANT_QUERY = gql( """ query ($project: String!) { - project (externalId: $project) { + project (name: $project) { participants { id externalId @@ -204,12 +204,12 @@ def main( # Pull Participant Data participant_data = [] - participant_ids: list = [] + internal_participant_ids: list = [] for sg in original_project_subset_data.get('project').get('samples'): participant = sg.get('participant') if participant: participant_data.append(participant) - participant_ids.append(participant.get('externalId')) + internal_participant_ids.append(participant.get('id')) # Populating test project target_project = project + '-test' @@ -223,7 +223,9 @@ def main( ) else: - family_ids = transfer_families(project, target_project, participant_ids) + family_ids = transfer_families( + project, target_project, internal_participant_ids + ) upserted_participant_map = transfer_ped(project, target_project, family_ids) existing_data = query(EXISTING_DATA_QUERY, {'project': target_project}) @@ -484,12 +486,12 @@ def get_sids_for_families( def transfer_families( - initial_project: str, target_project: str, participant_ids: list[int] + initial_project: str, target_project: str, internal_participant_ids: list[int] ) -> list[int]: """Pull relevant families from the input project, and copy to target_project""" families = fapi.get_families( project=initial_project, - participant_ids=participant_ids, + participant_ids=internal_participant_ids, ) family_ids = [family['id'] for family in families] From 3e7179f520a8987a7c431f010635c3d3b8d445ab Mon Sep 17 00:00:00 2001 From: vivbak Date: Tue, 26 Sep 2023 11:56:50 +1000 Subject: [PATCH 08/10] remove gross for-loop + fixes --- scripts/create_test_subset.py | 98 ++++++++++++++++++++--------------- 1 file changed, 57 insertions(+), 41 deletions(-) diff --git a/scripts/create_test_subset.py b/scripts/create_test_subset.py index 818c3e46b..785c2743a 100755 --- a/scripts/create_test_subset.py +++ b/scripts/create_test_subset.py @@ -250,44 +250,6 @@ def transfer_samples_sgs_assays( """ logging.info('Transferring samples, sequencing groups, and assays') for s in samples: - sample_sgs: list[SequencingGroupUpsert] = [] - for sg in s.get('sequencingGroups'): - sg_assays: list[AssayUpsert] = [] - existing_sg = get_existing_sg( - existing_data, s.get('externalId'), sg.get('type') - ) - _existing_sgid = existing_sg.get('id') if existing_sg else None - for assay in sg.get('assays'): - _existing_assay: dict[str, str] = {} - if _existing_sgid: - _existing_assay = get_existing_assay( - existing_data, - s.get('externalId'), - _existing_sgid, - assay.get('type'), - ) - existing_assay_id = ( - _existing_assay.get('id') if _existing_assay else None - ) - assay_upsert = AssayUpsert( - type=assay.get('type'), - id=existing_assay_id, - external_ids=assay.get('externalIds') or {}, - # sample_id=self.s, - meta=assay.get('meta'), - ) - sg_assays.append(assay_upsert) - sg_upsert = SequencingGroupUpsert( - id=_existing_sgid, - external_ids=sg.get('externalIds') or {}, - meta=sg.get('meta'), - platform=sg.get('platform'), - technology=sg.get('technology'), - type=sg.get('type'), - assays=sg_assays, - ) - sample_sgs.append(sg_upsert) - sample_type = None if s['type'] == 'None' else s['type'] existing_sid: str | None = None existing_sample = get_existing_sample(existing_data, s['externalId']) @@ -303,7 +265,7 @@ def transfer_samples_sgs_assays( type=sample_type or None, meta=(copy_files_in_dict(s['meta'], project) or {}), participant_id=existing_pid, - sequencing_groups=sample_sgs, + sequencing_groups=upsert_sequencing_groups(s, existing_data), id=existing_sid, ) @@ -315,6 +277,60 @@ def transfer_samples_sgs_assays( ) +def upsert_sequencing_groups( + sample: dict, existing_data: dict +) -> list[SequencingGroupUpsert]: + """Create SG Upsert Objects for a sample""" + sgs_to_upsert: list[SequencingGroupUpsert] = [] + for sg in sample.get('sequencingGroups'): + existing_sg = get_existing_sg( + existing_data, sample.get('externalId'), sg.get('type') + ) + existing_sgid = existing_sg.get('id') if existing_sg else None + sg_upsert = SequencingGroupUpsert( + id=existing_sgid, + external_ids=sg.get('externalIds') or {}, + meta=sg.get('meta'), + platform=sg.get('platform'), + technology=sg.get('technology'), + type=sg.get('type'), + assays=upsert_assays( + sg, existing_sgid, existing_data, sample.get('externalId') + ), + ) + sgs_to_upsert.append(sg_upsert) + + return sgs_to_upsert + + +def upsert_assays( + sg: dict, existing_sgid: str | None, existing_data: dict, sample_external_id +) -> list[AssayUpsert]: + """Create Assay Upsert Objects for a sequencing group""" + print(sg) + assays_to_upsert: list[AssayUpsert] = [] + _existing_assay: dict[str, str] = {} + for assay in sg.get('assays'): + # Check if assay exists + if existing_sgid: + _existing_assay = get_existing_assay( + existing_data, + sample_external_id, + existing_sgid, + assay.get('type'), + ) + existing_assay_id = _existing_assay.get('id') if _existing_assay else None + assay_upsert = AssayUpsert( + type=assay.get('type'), + id=existing_assay_id, + external_ids=assay.get('externalIds') or {}, + meta=assay.get('meta'), + ) + + assays_to_upsert.append(assay_upsert) + return assays_to_upsert + + def transfer_analyses( samples: dict, existing_data: dict, target_project: str, project: str ): @@ -405,7 +421,7 @@ def get_existing_sg( The SG Data, or None if no match is found """ if not sg_type and not sg_id: - raise ValueError('Must provide sg_type or sg_id when getting exsisting sg') + raise ValueError('Must provide sg_type or sg_id when getting existing sg') if sample := get_existing_sample(existing_data, sample_id): for sg in sample.get('sequencingGroups'): if sg_id and sg.get('id') == sg_id: @@ -420,7 +436,7 @@ def get_existing_assay( data: dict, sample_id: str, sg_id: str, assay_type: str ) -> dict | None: """ - Find assay in main data for this SGID + Find assay in main data for this sample Returns: The Assay Data, or None if no match is found """ From 071ed4c0310f0d0984dbd81b41dbf5537737faa1 Mon Sep 17 00:00:00 2001 From: vivbak Date: Tue, 26 Sep 2023 12:20:51 +1000 Subject: [PATCH 09/10] respond to feedback --- scripts/create_test_subset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/create_test_subset.py b/scripts/create_test_subset.py index 785c2743a..1f50fc8f8 100755 --- a/scripts/create_test_subset.py +++ b/scripts/create_test_subset.py @@ -205,7 +205,7 @@ def main( # Pull Participant Data participant_data = [] internal_participant_ids: list = [] - for sg in original_project_subset_data.get('project').get('samples'): + for sg in original_project_subset_data.get('project').get('samples', []): participant = sg.get('participant') if participant: participant_data.append(participant) @@ -466,7 +466,7 @@ def get_existing_analysis( def get_sids_for_families( project: str, families_n: int, additional_families: set[str] ) -> set[str]: - """Returns specific sequencing groups to be included in the test project.""" + """Returns specific samples to be included in the test project.""" family_sgid_output = query(QUERY_FAMILY_SGID, {'project': project}) From 7eee023fae436aebb83f1cb348822fac657f292a Mon Sep 17 00:00:00 2001 From: vivbak Date: Tue, 26 Sep 2023 13:08:08 +1000 Subject: [PATCH 10/10] Add more logs --- scripts/create_test_subset.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/scripts/create_test_subset.py b/scripts/create_test_subset.py index 1f50fc8f8..7a7a964fd 100755 --- a/scripts/create_test_subset.py +++ b/scripts/create_test_subset.py @@ -182,6 +182,7 @@ def main( raise ValueError('Come on, what exactly are you asking for?') # for reproducibility + logger.info('Setting random seed to 42') random.seed(42) # 1. Find and SG IDs to be moved by Family ID -test. @@ -191,13 +192,16 @@ def main( ) # 2. Get all sids in project. + logger.info(f'Querying all sids in {project}') sid_output = query(SG_ID_QUERY, variables={'project': project}) all_sids = {sid['id'] for sid in sid_output.get('project').get('samples')} + logger.info(f'Found {len(all_sids)} sids in {project}') # 3. Randomly select from the remaining sgs additional_samples.update(random.sample(all_sids - additional_samples, samples_n)) # 4. Query all the samples from the selected sgs + logger.info(f'Transfering {len(additional_samples)} samples. Querying metadata.') original_project_subset_data = query( QUERY_ALL_DATA, {'project': project, 'sids': list(additional_samples)} ) @@ -217,12 +221,17 @@ def main( # Parse Families & Participants if skip_ped: # If no family data is available, only the participants should be transferred. + logger.info( + 'Skipping pedigree/family information. Transferring participants only.' + ) + logger.info(f'Transferring {len(participant_data)} participants. ') upserted_participant_map = transfer_participants( target_project=target_project, participant_data=participant_data, ) else: + logger.info(f'Transferring {len(participant_data)} participants. ') family_ids = transfer_families( project, target_project, internal_participant_ids ) @@ -230,11 +239,14 @@ def main( existing_data = query(EXISTING_DATA_QUERY, {'project': target_project}) + logger.info('Transferring samples, sequencing groups, and assays') samples = original_project_subset_data.get('project').get('samples') transfer_samples_sgs_assays( samples, existing_data, upserted_participant_map, target_project, project ) + logger.info('Transferring analyses') transfer_analyses(samples, existing_data, target_project, project) + logger.info('Subset generation complete!') def transfer_samples_sgs_assays( @@ -248,7 +260,7 @@ def transfer_samples_sgs_assays( Transfer samples, sequencing groups, and assays from the original project to the test project. """ - logging.info('Transferring samples, sequencing groups, and assays') + logger.info(f'Transferring {len(samples)} samples') for s in samples: sample_type = None if s['type'] == 'None' else s['type'] existing_sid: str | None = None