From 3e7179f520a8987a7c431f010635c3d3b8d445ab Mon Sep 17 00:00:00 2001 From: vivbak Date: Tue, 26 Sep 2023 11:56:50 +1000 Subject: [PATCH] remove gross for-loop + fixes --- scripts/create_test_subset.py | 98 ++++++++++++++++++++--------------- 1 file changed, 57 insertions(+), 41 deletions(-) diff --git a/scripts/create_test_subset.py b/scripts/create_test_subset.py index 818c3e46b..785c2743a 100755 --- a/scripts/create_test_subset.py +++ b/scripts/create_test_subset.py @@ -250,44 +250,6 @@ def transfer_samples_sgs_assays( """ logging.info('Transferring samples, sequencing groups, and assays') for s in samples: - sample_sgs: list[SequencingGroupUpsert] = [] - for sg in s.get('sequencingGroups'): - sg_assays: list[AssayUpsert] = [] - existing_sg = get_existing_sg( - existing_data, s.get('externalId'), sg.get('type') - ) - _existing_sgid = existing_sg.get('id') if existing_sg else None - for assay in sg.get('assays'): - _existing_assay: dict[str, str] = {} - if _existing_sgid: - _existing_assay = get_existing_assay( - existing_data, - s.get('externalId'), - _existing_sgid, - assay.get('type'), - ) - existing_assay_id = ( - _existing_assay.get('id') if _existing_assay else None - ) - assay_upsert = AssayUpsert( - type=assay.get('type'), - id=existing_assay_id, - external_ids=assay.get('externalIds') or {}, - # sample_id=self.s, - meta=assay.get('meta'), - ) - sg_assays.append(assay_upsert) - sg_upsert = SequencingGroupUpsert( - id=_existing_sgid, - external_ids=sg.get('externalIds') or {}, - meta=sg.get('meta'), - platform=sg.get('platform'), - technology=sg.get('technology'), - type=sg.get('type'), - assays=sg_assays, - ) - sample_sgs.append(sg_upsert) - sample_type = None if s['type'] == 'None' else s['type'] existing_sid: str | None = None existing_sample = get_existing_sample(existing_data, s['externalId']) @@ -303,7 +265,7 @@ def transfer_samples_sgs_assays( type=sample_type or None, meta=(copy_files_in_dict(s['meta'], project) or {}), participant_id=existing_pid, - sequencing_groups=sample_sgs, + sequencing_groups=upsert_sequencing_groups(s, existing_data), id=existing_sid, ) @@ -315,6 +277,60 @@ def transfer_samples_sgs_assays( ) +def upsert_sequencing_groups( + sample: dict, existing_data: dict +) -> list[SequencingGroupUpsert]: + """Create SG Upsert Objects for a sample""" + sgs_to_upsert: list[SequencingGroupUpsert] = [] + for sg in sample.get('sequencingGroups'): + existing_sg = get_existing_sg( + existing_data, sample.get('externalId'), sg.get('type') + ) + existing_sgid = existing_sg.get('id') if existing_sg else None + sg_upsert = SequencingGroupUpsert( + id=existing_sgid, + external_ids=sg.get('externalIds') or {}, + meta=sg.get('meta'), + platform=sg.get('platform'), + technology=sg.get('technology'), + type=sg.get('type'), + assays=upsert_assays( + sg, existing_sgid, existing_data, sample.get('externalId') + ), + ) + sgs_to_upsert.append(sg_upsert) + + return sgs_to_upsert + + +def upsert_assays( + sg: dict, existing_sgid: str | None, existing_data: dict, sample_external_id +) -> list[AssayUpsert]: + """Create Assay Upsert Objects for a sequencing group""" + print(sg) + assays_to_upsert: list[AssayUpsert] = [] + _existing_assay: dict[str, str] = {} + for assay in sg.get('assays'): + # Check if assay exists + if existing_sgid: + _existing_assay = get_existing_assay( + existing_data, + sample_external_id, + existing_sgid, + assay.get('type'), + ) + existing_assay_id = _existing_assay.get('id') if _existing_assay else None + assay_upsert = AssayUpsert( + type=assay.get('type'), + id=existing_assay_id, + external_ids=assay.get('externalIds') or {}, + meta=assay.get('meta'), + ) + + assays_to_upsert.append(assay_upsert) + return assays_to_upsert + + def transfer_analyses( samples: dict, existing_data: dict, target_project: str, project: str ): @@ -405,7 +421,7 @@ def get_existing_sg( The SG Data, or None if no match is found """ if not sg_type and not sg_id: - raise ValueError('Must provide sg_type or sg_id when getting exsisting sg') + raise ValueError('Must provide sg_type or sg_id when getting existing sg') if sample := get_existing_sample(existing_data, sample_id): for sg in sample.get('sequencingGroups'): if sg_id and sg.get('id') == sg_id: @@ -420,7 +436,7 @@ def get_existing_assay( data: dict, sample_id: str, sg_id: str, assay_type: str ) -> dict | None: """ - Find assay in main data for this SGID + Find assay in main data for this sample Returns: The Assay Data, or None if no match is found """