diff --git a/scripts/create_test_subset.py b/scripts/create_test_subset.py index 117f081c6..7a7a964fd 100755 --- a/scripts/create_test_subset.py +++ b/scripts/create_test_subset.py @@ -1,9 +1,5 @@ #!/usr/bin/env python3 -# type: ignore -# pylint: skip-file - - -# # pylint: disable=too-many-instance-attributes,too-many-locals,unused-argument,wrong-import-order,unused-argument,too-many-arguments +# pylint: disable=too-many-instance-attributes,too-many-locals """ Example Invocation @@ -20,23 +16,23 @@ import os import random import subprocess -import traceback +from argparse import ArgumentParser from collections import Counter -from typing import Optional -import click from google.cloud import storage -from metamist import exceptions -from metamist.apis import AnalysisApi, AssayApi, FamilyApi, ParticipantApi, SampleApi +from metamist.apis import AnalysisApi, AssayApi, SampleApi, FamilyApi, ParticipantApi +from metamist.graphql import gql, query from metamist.models import ( - Analysis, - AnalysisStatus, AssayUpsert, - BodyGetAssaysByCriteria, SampleUpsert, + Analysis, + AnalysisStatus, + AnalysisUpdateModel, + SequencingGroupUpsert, ) + logger = logging.getLogger(__file__) logging.basicConfig(format='%(levelname)s (%(name)s %(lineno)s): %(message)s') logger.setLevel(logging.INFO) @@ -49,285 +45,481 @@ DEFAULT_SAMPLES_N = 10 - -@click.command() -@click.option( - '--project', - required=True, - help='The metamist project ($DATASET)', -) -@click.option( - '-n', - '--samples', - 'samples_n', - type=int, - help='Number of samples to subset', -) -@click.option( - '--families', - 'families_n', - type=int, - help='Minimal number of families to include', +QUERY_ALL_DATA = gql( + """ + query getAllData($project: String!, $sids: [String!]) { + project(name: $project) { + samples(id: {in_: $sids}) { + id + meta + type + externalId + participant { + externalId + id + karyotype + meta + reportedGender + reportedSex + } + sequencingGroups{ + id + meta + platform + technology + type + assays { + id + meta + type + } + analyses { + active + id + meta + output + status + timestampCompleted + type + } + } + } + } + } + """ ) -# Flag to be used when there isn't available pedigree/family information. -@click.option( - '--skip-ped', - 'skip_ped', - is_flag=True, - default=False, - help='Skip transferring pedigree/family information', + +# TODO: We can change this to filter external sample ids +EXISTING_DATA_QUERY = gql( + """ + query getExistingData($project: String!) { + project(name: $project) { + samples{ + id + externalId + sequencingGroups { + id + type + assays { + id + type + } + analyses { + id + type + } + } + } + } + } + """ ) -@click.option( - '--add-family', - 'additional_families', - type=str, - multiple=True, - help="""Additional families to include. - All samples from these fams will be included. - This is in addition to the number of families specified in - --families and the number of samples specified in -n""", + +QUERY_FAMILY_SGID = gql( + """ + query FamilyQuery($project: String!) { + project(name: $project) { + families { + id + externalId + participants { + samples { + id + } + } + } + + } + } +""" ) -@click.option( - '--add-sample', - 'additional_samples', - type=str, - multiple=True, - help="""Additional samples to include. - This is in addition to the number of families specified in - --families and the number of samples specified in -n""", + +SG_ID_QUERY = gql( + """ + query getSGIds($project: String!) { + project(name: $project) { + samples{ + id + externalId + sequencingGroups { + id + } + } + } + } + """ ) -@click.option( - '--noninteractive', - 'noninteractive', - is_flag=True, - default=False, - help='Skip interactive confirmation', + +PARTICIPANT_QUERY = gql( + """ + query ($project: String!) { + project (name: $project) { + participants { + id + externalId + } + } + } + """ ) + + def main( project: str, - samples_n: Optional[int], - families_n: Optional[int], - skip_ped: Optional[bool] = True, - additional_families: Optional[tuple[str]] = None, - additional_samples: Optional[tuple[str]] = None, - noninteractive: Optional[bool] = False, + samples_n: int, + families_n: int, + additional_families: set[str], + additional_samples: set[str], + skip_ped: bool = True, ): """ Script creates a test subset for a given project. A new project with a prefix -test is created, and for any files in sample/meta, sequence/meta, or analysis/output a copy in the -test namespace is created. """ - samples_n, families_n = _validate_opts(samples_n, families_n, noninteractive) - _additional_families: list[str] = list(additional_families) - _additional_samples: list[str] = list(additional_samples) - - all_samples = sapi.get_samples( - body_get_samples={ - 'project_ids': [project], - 'active': True, - } - ) - logger.info(f'Found {len(all_samples)} samples') - if (samples_n and samples_n >= len(all_samples)) and not noninteractive: - resp = str( - input( - f'Requesting {samples_n} samples which is >= ' - f'than the number of available samples ({len(all_samples)}). ' - f'The test project will be a copy of the production project. ' - f'Please confirm (y): ' - ) - ) - if resp.lower() != 'y': - raise SystemExit() - random.seed(42) # for reproducibility + if not any([additional_families, additional_samples, samples_n, families_n]): + raise ValueError('Come on, what exactly are you asking for?') - pid_sid = papi.get_external_participant_id_to_internal_sample_id(project) - sample_id_by_participant_id = dict(pid_sid) + # for reproducibility + logger.info('Setting random seed to 42') + random.seed(42) - if families_n: - if _additional_samples: - _additional_families.extend( - get_fams_for_samples( - project, - _additional_samples, - ) - ) - fams = fapi.get_families(project) - _additional_families_set = set(_additional_families) - families_to_subset = [ - family['id'] - for family in fams - if family['external_id'] not in _additional_families_set - ] - full_pedigree = fapi.get_pedigree( - project=project, internal_family_ids=families_to_subset - ) - external_family_ids = _get_random_families(full_pedigree, families_n) - external_family_ids.extend(_additional_families) - internal_family_ids = [ - fam['id'] for fam in fams if fam['external_id'] in external_family_ids - ] - pedigree = fapi.get_pedigree( - project=project, internal_family_ids=internal_family_ids + # 1. Find and SG IDs to be moved by Family ID -test. + if families_n or additional_families: + additional_samples.update( + get_sids_for_families(project, families_n, additional_families) ) - _print_fam_stats(pedigree) - p_ids = [ped['individual_id'] for ped in pedigree] - sample_ids = [ - sample - for (participant, sample) in sample_id_by_participant_id.items() - if participant in p_ids - ] - sample_set = set(sample_ids) - samples = [s for s in all_samples if s['id'] in sample_set] + # 2. Get all sids in project. + logger.info(f'Querying all sids in {project}') + sid_output = query(SG_ID_QUERY, variables={'project': project}) + all_sids = {sid['id'] for sid in sid_output.get('project').get('samples')} + logger.info(f'Found {len(all_sids)} sids in {project}') - else: - if _additional_families: - _additional_samples.extend( - get_samples_for_families(project, _additional_families) - ) + # 3. Randomly select from the remaining sgs + additional_samples.update(random.sample(all_sids - additional_samples, samples_n)) - _additional_samples_set = set(_additional_samples) - samples_to_subset = [ - sample - for sample in all_samples - if sample['id'] not in _additional_samples_set - ] - samples_to_add = [ - sample for sample in all_samples if sample['id'] in _additional_samples_set - ] - samples = random.sample(list(samples_to_subset), samples_n) - samples.extend(samples_to_add) - sample_ids = [s['id'] for s in samples] - - logger.info( - f'Subset to {len(samples)} samples (internal ID / external ID): ' - f'{_pretty_format_samples(samples)}' + # 4. Query all the samples from the selected sgs + logger.info(f'Transfering {len(additional_samples)} samples. Querying metadata.') + original_project_subset_data = query( + QUERY_ALL_DATA, {'project': project, 'sids': list(additional_samples)} ) + # Pull Participant Data + participant_data = [] + internal_participant_ids: list = [] + for sg in original_project_subset_data.get('project').get('samples', []): + participant = sg.get('participant') + if participant: + participant_data.append(participant) + internal_participant_ids.append(participant.get('id')) + # Populating test project target_project = project + '-test' - logger.info('Checking any existing test samples in the target test project') - - test_sample_by_external_id = _process_existing_test_samples(target_project, samples) - - try: - seq_infos: list[dict[str, str]] = assayapi.get_assays_by_criteria( - body_get_assays_by_criteria=BodyGetAssaysByCriteria( - sample_ids=sample_ids, - ) - ) - except exceptions.ApiException: - seq_info_by_s_id = {} - else: - seq_info_by_s_id = {seq['sample_id']: seq for seq in seq_infos} - - analysis_by_sid_by_type: dict[str, dict[str, dict[str, str]]] = { - 'cram': {}, - 'gvcf': {}, - } - for a_type, analysis_by_sid in analysis_by_sid_by_type.items(): - try: - # TODO: fix this - analyses: list[ - dict[str, str] - ] = aapi.get_latest_analysis_for_samples_and_type( - project=project, - analysis_type=str(a_type), - request_body=sample_ids, - ) - except exceptions.ApiException: - traceback.print_exc() - else: - for a in analyses: - analysis_by_sid[a['sample_ids'][0]] = a - logger.info(f'Will copy {a_type} analysis entries: {analysis_by_sid}') # Parse Families & Participants - participant_ids = [int(sample['participant_id']) for sample in samples] if skip_ped: # If no family data is available, only the participants should be transferred. - external_participant_ids = transfer_participants( - initial_project=project, + logger.info( + 'Skipping pedigree/family information. Transferring participants only.' + ) + logger.info(f'Transferring {len(participant_data)} participants. ') + upserted_participant_map = transfer_participants( target_project=target_project, - participant_ids=participant_ids, + participant_data=participant_data, ) else: - family_ids = transfer_families(project, target_project, participant_ids) - external_participant_ids = transfer_ped(project, target_project, family_ids) + logger.info(f'Transferring {len(participant_data)} participants. ') + family_ids = transfer_families( + project, target_project, internal_participant_ids + ) + upserted_participant_map = transfer_ped(project, target_project, family_ids) - external_sample_internal_participant_map = get_map_ipid_esid( - project, target_project, external_participant_ids + existing_data = query(EXISTING_DATA_QUERY, {'project': target_project}) + + logger.info('Transferring samples, sequencing groups, and assays') + samples = original_project_subset_data.get('project').get('samples') + transfer_samples_sgs_assays( + samples, existing_data, upserted_participant_map, target_project, project ) + logger.info('Transferring analyses') + transfer_analyses(samples, existing_data, target_project, project) + logger.info('Subset generation complete!') - new_sample_map = {} +def transfer_samples_sgs_assays( + samples: dict, + existing_data: dict, + upserted_participant_map: dict[str, int], + target_project: str, + project: str, +): + """ + Transfer samples, sequencing groups, and assays from the original project to the + test project. + """ + logger.info(f'Transferring {len(samples)} samples') for s in samples: + sample_type = None if s['type'] == 'None' else s['type'] + existing_sid: str | None = None + existing_sample = get_existing_sample(existing_data, s['externalId']) + if existing_sample: + existing_sid = existing_sample['id'] + + existing_pid: int | None = None + if s['participant']: + existing_pid = upserted_participant_map[s['participant']['externalId']] + + sample_upsert = SampleUpsert( + external_id=s['externalId'], + type=sample_type or None, + meta=(copy_files_in_dict(s['meta'], project) or {}), + participant_id=existing_pid, + sequencing_groups=upsert_sequencing_groups(s, existing_data), + id=existing_sid, + ) + logger.info(f'Processing sample {s["id"]}') + logger.info('Creating test sample entry') + sapi.create_sample( + project=target_project, + sample_upsert=sample_upsert, + ) + + +def upsert_sequencing_groups( + sample: dict, existing_data: dict +) -> list[SequencingGroupUpsert]: + """Create SG Upsert Objects for a sample""" + sgs_to_upsert: list[SequencingGroupUpsert] = [] + for sg in sample.get('sequencingGroups'): + existing_sg = get_existing_sg( + existing_data, sample.get('externalId'), sg.get('type') + ) + existing_sgid = existing_sg.get('id') if existing_sg else None + sg_upsert = SequencingGroupUpsert( + id=existing_sgid, + external_ids=sg.get('externalIds') or {}, + meta=sg.get('meta'), + platform=sg.get('platform'), + technology=sg.get('technology'), + type=sg.get('type'), + assays=upsert_assays( + sg, existing_sgid, existing_data, sample.get('externalId') + ), + ) + sgs_to_upsert.append(sg_upsert) + + return sgs_to_upsert + + +def upsert_assays( + sg: dict, existing_sgid: str | None, existing_data: dict, sample_external_id +) -> list[AssayUpsert]: + """Create Assay Upsert Objects for a sequencing group""" + print(sg) + assays_to_upsert: list[AssayUpsert] = [] + _existing_assay: dict[str, str] = {} + for assay in sg.get('assays'): + # Check if assay exists + if existing_sgid: + _existing_assay = get_existing_assay( + existing_data, + sample_external_id, + existing_sgid, + assay.get('type'), + ) + existing_assay_id = _existing_assay.get('id') if _existing_assay else None + assay_upsert = AssayUpsert( + type=assay.get('type'), + id=existing_assay_id, + external_ids=assay.get('externalIds') or {}, + meta=assay.get('meta'), + ) + + assays_to_upsert.append(assay_upsert) + return assays_to_upsert + - if s['external_id'] in test_sample_by_external_id: - new_s_id = test_sample_by_external_id[s['external_id']]['id'] - logger.info(f'Sample already in test project, with ID {new_s_id}') - new_sample_map[s['id']] = new_s_id - - else: - logger.info('Creating test sample entry') - new_s_id = sapi.create_sample( - project=target_project, - sample_upsert=SampleUpsert( - external_id=s['external_id'], - type=s['type'], - meta=(_copy_files_in_dict(s['meta'], project) or {}), - participant_id=external_sample_internal_participant_map[ - s['external_id'] - ], - ), +def transfer_analyses( + samples: dict, existing_data: dict, target_project: str, project: str +): + """ + This function will transfer the analyses from the original project to the test project. + """ + new_sg_data = query(SG_ID_QUERY, {'project': target_project}) + + new_sg_map = {} + for s in new_sg_data.get('project').get('samples'): + sg_ids: list = [] + for sg in s.get('sequencingGroups'): + sg_ids.append(sg.get('id')) + new_sg_map[s.get('externalId')] = sg_ids + + for s in samples: + for sg in s['sequencingGroups']: + existing_sg = get_existing_sg( + existing_data, s.get('externalId'), sg.get('type') ) - new_sample_map[s['id']] = new_s_id - - seq_info = seq_info_by_s_id.get(s['id']) - if seq_info: - logger.info('Processing sequence entry') - new_meta = _copy_files_in_dict(seq_info.get('meta'), project) - logger.info('Creating sequence entry in test') - assayapi.create_assay( - assay_upsert=AssayUpsert( - sample_id=new_s_id, - meta=new_meta, - type=seq_info['type'], - external_ids=seq_info['external_ids'], - ), + existing_sgid = existing_sg.get('id') if existing_sg else None + for analysis in sg['analyses']: + if analysis['type'] not in ['cram', 'gvcf']: + # Currently the create_test_subset script only handles crams or gvcf files. + continue + + existing_analysis: dict = {} + if existing_sgid: + existing_analysis = get_existing_analysis( + existing_data, s['externalId'], existing_sgid, analysis['type'] + ) + existing_analysis_id = ( + existing_analysis.get('id') if existing_analysis else None ) + if existing_analysis_id: + am = AnalysisUpdateModel( + type=analysis['type'], + output=copy_files_in_dict( + analysis['output'], + project, + (str(sg['id']), new_sg_map[s['externalId']][0]), + ), + status=AnalysisStatus(analysis['status'].lower()), + sequencing_group_ids=new_sg_map[s['externalId']], + meta=analysis['meta'], + ) + aapi.update_analysis( + analysis_id=existing_analysis_id, + analysis_update_model=am, + ) + else: + am = Analysis( + type=analysis['type'], + output=copy_files_in_dict( + analysis['output'], + project, + (str(sg['id']), new_sg_map[s['externalId']][0]), + ), + status=AnalysisStatus(analysis['status'].lower()), + sequencing_group_ids=new_sg_map[s['externalId']], + meta=analysis['meta'], + ) + + logger.info(f'Creating {analysis["type"]}analysis entry in test') + aapi.create_analysis(project=target_project, analysis=am) + + +def get_existing_sample(data: dict, sample_id: str) -> dict | None: + """ + Get the existing sample object for this ID + Returns: + The Sample dictionary, or None if unmatched + """ + for sample in data.get('project', {}).get('samples', []): + if sample.get('externalId') == sample_id: + return sample - for a_type in ['cram', 'gvcf']: - analysis = analysis_by_sid_by_type[a_type].get(s['id']) - if analysis: - logger.info(f'Processing {a_type} analysis entry') - am = Analysis( - type=str(a_type), - output=_copy_files_in_dict( - analysis['output'], - project, - (str(s['id']), new_sample_map[s['id']]), - ), - status=AnalysisStatus(analysis['status']), - sample_ids=[new_sample_map[s['id']]], - meta=analysis['meta'], - ) - logger.info(f'Creating {a_type} analysis entry in test') - aapi.create_analysis(project=target_project, analysis=am) - logger.info('-') + return None + + +def get_existing_sg( + existing_data: dict, sample_id: str, sg_type: str = None, sg_id: str = None +) -> dict | None: + """ + Find a SG ID in the main data based on a sample ID + Match either on CPG ID or type (exome/genome) + Returns: + The SG Data, or None if no match is found + """ + if not sg_type and not sg_id: + raise ValueError('Must provide sg_type or sg_id when getting existing sg') + if sample := get_existing_sample(existing_data, sample_id): + for sg in sample.get('sequencingGroups'): + if sg_id and sg.get('id') == sg_id: + return sg + if sg_type and sg.get('type') == sg_type: + return sg + + return None + + +def get_existing_assay( + data: dict, sample_id: str, sg_id: str, assay_type: str +) -> dict | None: + """ + Find assay in main data for this sample + Returns: + The Assay Data, or None if no match is found + """ + if sg := get_existing_sg(existing_data=data, sample_id=sample_id, sg_id=sg_id): + for assay in sg.get('assays', []): + if assay.get('type') == assay_type: + return assay + + return None + + +def get_existing_analysis( + data: dict, sample_id: str, sg_id: str, analysis_type: str +) -> dict | None: + """ + Find the existing SG for this sample, then identify any relevant analysis objs + Returns: + an analysis dict, or None if the right type isn't found + """ + if sg := get_existing_sg(existing_data=data, sample_id=sample_id, sg_id=sg_id): + for analysis in sg.get('analyses', []): + if analysis.get('type') == analysis_type: + return analysis + return None + + +def get_sids_for_families( + project: str, families_n: int, additional_families: set[str] +) -> set[str]: + """Returns specific samples to be included in the test project.""" + + family_sgid_output = query(QUERY_FAMILY_SGID, {'project': project}) + + all_family_sgids = family_sgid_output.get('project', {}).get('families', []) + assert all_family_sgids, 'No families returned in GQL result' + + # 1. Remove the specifically requested families + user_input_families = [ + fam for fam in all_family_sgids if fam['externalId'] in additional_families + ] + + # TODO: Replace this with the nice script that randomly selects better :) + # 2. Randomly select from the remaining families (families_n can be 0) + user_input_families.extend( + random.sample( + [ + fam + for fam in all_family_sgids + if fam['externalId'] not in additional_families + ], + families_n, + ) + ) + + # 3. Pull SGs from random + specific families + included_sids: set[str] = set() + for fam in user_input_families: + for participant in fam['participants']: + for sample in participant['samples']: + included_sids.add(sample['id']) + + return included_sids def transfer_families( - initial_project: str, target_project: str, participant_ids: list[int] + initial_project: str, target_project: str, internal_participant_ids: list[int] ) -> list[int]: """Pull relevant families from the input project, and copy to target_project""" families = fapi.get_families( project=initial_project, - participant_ids=participant_ids, + participant_ids=internal_participant_ids, ) family_ids = [family['id'] for family in families] @@ -355,20 +547,13 @@ def transfer_families( def transfer_ped( initial_project: str, target_project: str, family_ids: list[int] -) -> list[str]: +) -> dict[str, int]: """Pull pedigree from the input project, and copy to target_project""" ped_tsv = fapi.get_pedigree( initial_project, export_type='tsv', internal_family_ids=family_ids, ) - ped_json = fapi.get_pedigree( - initial_project, - export_type='json', - internal_family_ids=family_ids, - ) - - external_participant_ids = [ped['individual_id'] for ped in ped_json] tmp_ped_tsv = 'tmp_ped.tsv' # Work-around as import_pedigree takes a file. with open(tmp_ped_tsv, 'w') as tmp_ped: @@ -382,18 +567,21 @@ def transfer_ped( create_missing_participants=True, ) - return external_participant_ids + # Get map of external participant id to internal + participant_output = query(PARTICIPANT_QUERY, {'project': target_project}) + participant_map = { + participant['externalId']: participant['id'] + for participant in participant_output.get('project').get('participants') + } + + return participant_map def transfer_participants( - initial_project: str, target_project: str, participant_ids: list[int] -) -> list[str]: + target_project: str, + participant_data, +) -> dict[str, int]: """Transfers relevant participants between projects""" - - current_participants = papi.get_participants( - initial_project, - body_get_participants={'internal_participant_ids': participant_ids}, - ) existing_participants = papi.get_participants(target_project) target_project_epids = [ @@ -401,192 +589,39 @@ def transfer_participants( ] participants_to_transfer = [] - for participant in current_participants: - if participant['external_id'] not in target_project_epids: + for participant in participant_data: + if participant['externalId'] not in target_project_epids: # Participants with id field will be updated & those without will be inserted del participant['id'] - transfer_participant = {k: v for k, v in participant.items() if v is not None} + transfer_participant = { + 'external_id': participant['externalId'], + 'meta': participant.get('meta') or {}, + 'karyotype': participant.get('karyotype'), + 'reported_gender': participant.get('reportedGender'), + 'reported_sex': participant.get('reportedSex'), + 'id': participant.get('id'), + 'samples': [], + } # Participants are being created before the samples are, so this will be empty for now. - transfer_participant['samples'] = [] participants_to_transfer.append(transfer_participant) upserted_participants = papi.upsert_participants( target_project, participant_upsert=participants_to_transfer ) - return list(upserted_participants.keys()) - - -def get_map_ipid_esid( - project: str, target_project: str, external_participant_ids: list[str] -) -> dict[str, str]: - """Intermediate steps to determine the mapping of esid to ipid - Acronyms - ep : external participant id - ip : internal participant id - es : external sample id - is : internal sample id - """ - - # External PID: Internal PID - ep_ip_map = papi.get_participant_id_map_by_external_ids( - target_project, request_body=external_participant_ids - ) - - # External PID : Internal SID - ep_is_map = papi.get_external_participant_id_to_internal_sample_id(project) - - # Internal PID : Internal SID - ip_is_map = [] - for ep_is_pair in ep_is_map: - if ep_is_pair[0] in ep_ip_map: - ep_is_pair[0] = ep_ip_map[ep_is_pair[0]] - ip_is_map.append(ep_is_pair) - - # Internal PID : External SID - is_es_map = sapi.get_all_sample_id_map_by_internal(project) - - ip_es_map = [] - for ip_is_pair in ip_is_map: - samples_per_participant = [ip_is_pair[0]] - for isid in ip_is_pair[1:]: - if isid in is_es_map: - samples_per_participant.append(is_es_map[isid]) - ip_es_map.append(samples_per_participant) - - # External SID : Internal PID (Normalised) - external_sample_internal_participant_map = _normalise_map(ip_es_map) - - return external_sample_internal_participant_map - - -def get_samples_for_families(project: str, additional_families: list[str]) -> list[str]: - """Returns the samples that belong to a list of families""" - - full_pedigree = fapi.get_pedigree( - project=project, - replace_with_participant_external_ids=False, - replace_with_family_external_ids=True, - ) - - ipids = [ - family['individual_id'] - for family in full_pedigree - if family['family_id'] in additional_families - ] - - sample_objects = sapi.get_samples( - body_get_samples={ - 'project_ids': [project], - 'participant_ids': ipids, - 'active': True, - } - ) - - samples: list[str] = [sample['id'] for sample in sample_objects] - - return samples - - -def get_fams_for_samples( - project: str, - additional_samples: Optional[list[str]] = None, -) -> list[str]: - """Returns the families that a list of samples belong to""" - sample_objects = sapi.get_samples( - body_get_samples={ - 'project_ids': [project], - 'sample_ids': additional_samples, - 'active': True, - } - ) - pids = [sample['participant_id'] for sample in sample_objects] - full_pedigree = fapi.get_pedigree( - project=project, - replace_with_participant_external_ids=False, - replace_with_family_external_ids=True, - ) - - fams: set[str] = { - fam['family_id'] for fam in full_pedigree if str(fam['individual_id']) in pids - } + external_to_internal_participant_id_map: dict[str, int] = {} - return list(fams) + for participant in upserted_participants: + external_to_internal_participant_id_map[ + participant['external_id'] + ] = participant['id'] + return external_to_internal_participant_id_map -def _normalise_map(unformatted_map: list[list[str]]) -> dict[str, str]: - """Input format: [[value1,key1,key2],[value2,key4]] - Output format: {key1:value1, key2: value1, key3:value2}""" - - normalised_map = {} - for group in unformatted_map: - value = group[0] - for key in group[1:]: - normalised_map[key] = value - - return normalised_map - - -def _validate_opts( - samples_n: int, families_n: int, noninteractive: bool -) -> tuple[Optional[int], Optional[int]]: - if samples_n is not None and families_n is not None: - raise click.BadParameter('Please specify only one of --samples or --families') - - if samples_n is None and families_n is None: - samples_n = DEFAULT_SAMPLES_N - logger.info( - f'Neither --samples nor --families specified, defaulting to selecting ' - f'{samples_n} samples' - ) - - if families_n is not None and families_n < 1: - raise click.BadParameter('Please specify --families higher than 0') - - if (families_n is not None and families_n >= 30) and not noninteractive: - resp = str( - input( - f'You requested a subset of {families_n} families. ' - f'Please confirm (y): ' - ) - ) - if resp.lower() != 'y': - raise SystemExit() - - if (samples_n is not None and samples_n >= 100) and not noninteractive: - resp = str( - input( - f'You requested a subset of {samples_n} samples. ' - f'Please confirm (y): ' - ) - ) - if resp.lower() != 'y': - raise SystemExit() - return samples_n, families_n - - -def _print_fam_stats(families: list[dict[str, str]]): - family_sizes = Counter([fam['family_id'] for fam in families]) - fam_by_size: Counter[int] = Counter() - # determine number of singles, duos, trios, etc - for fam in family_sizes: - fam_by_size[family_sizes[fam]] += 1 - for fam_size in sorted(fam_by_size): - if fam_size == 1: - label = 'singles' - elif fam_size == 2: - label = 'duos' - elif fam_size == 3: - label = 'trios' - else: - label = f'{fam_size} members' - logger.info(f' {label}: {fam_by_size[fam_size]}') - - -def _get_random_families( +def get_random_families( families: list[dict[str, str]], families_n: int, - include_single_person_families: Optional[bool] = False, + include_single_person_families: bool = False, ) -> list[str]: """Obtains a subset of families, that are a little less random. By default single-person families are discarded. @@ -631,7 +666,7 @@ def _get_random_families( return returned_families -def _copy_files_in_dict(d, dataset: str, sid_replacement: tuple[str, str] = None): +def copy_files_in_dict(d, dataset: str, sid_replacement: tuple[str, str] = None): """ Replaces all `gs://cpg-{project}-main*/` paths into `gs://cpg-{project}-test*/` and creates copies if needed @@ -670,48 +705,12 @@ def _copy_files_in_dict(d, dataset: str, sid_replacement: tuple[str, str] = None subprocess.run(cmd, check=False, shell=True) return new_path if isinstance(d, list): - return [_copy_files_in_dict(x, dataset) for x in d] + return [copy_files_in_dict(x, dataset) for x in d] if isinstance(d, dict): - return {k: _copy_files_in_dict(v, dataset) for k, v in d.items()} + return {k: copy_files_in_dict(v, dataset) for k, v in d.items()} return d -def _pretty_format_samples(samples: list[dict[str, str]]) -> str: - return ', '.join(f"{s['id']}/{s['external_id']}" for s in samples) - - -def _process_existing_test_samples( - test_project: str, samples: list[dict[str, str]] -) -> dict[str, dict[str, str]]: - """ - Removes samples that need to be removed and returns those that need to be kept - """ - test_samples = sapi.get_samples( - body_get_samples={ - 'project_ids': [test_project], - 'active': True, - } - ) - external_ids = [s['external_id'] for s in samples] - test_samples_to_remove = [ - s for s in test_samples if s['external_id'] not in external_ids - ] - test_samples_to_keep = [s for s in test_samples if s['external_id'] in external_ids] - if test_samples_to_remove: - logger.info( - f'Removing test samples: {_pretty_format_samples(test_samples_to_remove)}' - ) - for s in test_samples_to_remove: - sapi.update_sample(s['id'], SampleUpsert(active=False)) - - if test_samples_to_keep: - logger.info( - f'Test samples already exist: {_pretty_format_samples(test_samples_to_keep)}' - ) - - return {s['external_id']: s for s in test_samples_to_keep} - - def file_exists(path: str) -> bool: """ Check if the object exists, where the object can be: @@ -730,5 +729,45 @@ def file_exists(path: str) -> bool: if __name__ == '__main__': - # pylint: disable=no-value-for-parameter - main() + parser = ArgumentParser(description='Argument parser for subset generator') + parser.add_argument( + '--project', required=True, help='The sample-metadata project ($DATASET)' + ) + parser.add_argument('-n', type=int, help='# Random Samples to copy', default=0) + parser.add_argument('-f', type=int, help='# Random families to copy', default=0) + # Flag to be used when there isn't available pedigree/family information. + parser.add_argument( + '--skip-ped', + action='store_true', + help='Skip transferring pedigree/family information', + ) + parser.add_argument( + '--families', + nargs='+', + help='Additional families to include.', + type=str, + default={}, + ) + parser.add_argument( + '--samples', + nargs='+', + help='Additional samples to include.', + type=str, + default={}, + ) + parser.add_argument( + '--noninteractive', action='store_true', help='Skip interactive confirmation' + ) + args, fail = parser.parse_known_args() + if fail: + parser.print_help() + raise AttributeError(f'Invalid arguments: {fail}') + + main( + project=args.project, + samples_n=args.n, + families_n=args.f, + additional_samples=set(args.samples), + additional_families=set(args.families), + skip_ped=args.skip_ped, + )