From 90d6e16c2630db0cb384f738625c840abc3b9d69 Mon Sep 17 00:00:00 2001 From: EddieLF Date: Tue, 12 Sep 2023 17:51:15 +1000 Subject: [PATCH] Update auditing module with fixes and improvements --- metamist/audit/README.md | 146 ++++++++++++++++++++ metamist/audit/audit_upload_bucket.py | 158 ++++++++++++--------- metamist/audit/generic_auditor.py | 191 ++++++++++---------------- 3 files changed, 312 insertions(+), 183 deletions(-) create mode 100644 metamist/audit/README.md diff --git a/metamist/audit/README.md b/metamist/audit/README.md new file mode 100644 index 000000000..8b7ae0242 --- /dev/null +++ b/metamist/audit/README.md @@ -0,0 +1,146 @@ +# Metamist Audit + +This module contains the code for the Metamist audit tool. This tool is used to check the integrity of the +Metamist database and the various dataset upload buckets. + +## Upload bucket auditor + +This auditor looks for sequence files in the main upload bucket that can be deleted or ingested, +as well as samples that have no completed cram. + +### Procedure + +1. Input a Metamist dataset, sequence types to audit, and file types to search for in the bucket +2. Get all the participants, samples, sequencing groups, and assays for this dataset +3. Search the upload bucket for all assay files, and compare to the files in the metamist assay reads + - If there are discrepencies, check if the file name and size is the same in the bucket as it is in metamist + - If the name and file size are the same, assume this file has just been moved around in the bucket +4. Check if a sequencing group has a completed cram - if so, its assay read files can be deleted +5. Any remaining assay data in the bucket might require ingestion +6. Create reports in the audit_results folder of the upload bucket, containing assay read files to delete, + ingest, and any sequencing groups without completed crams. + + +### How it works + +The auditor initialises an instance of the UploadBucketAuditor class, inheriting from GenericAuditor. +The auditor queries the dataset using GraphQL, returning all the participants, samples, sequencing groups, and assays (in that hierarchy). +The auditor creates several mappings between the metamist entries. These include: + + - sequencing_group_id -> sample_id + - assay_id -> sequencing_group_id + - assay_id -> read_file + - sequencing_group_id -> cram_file + +The auditor then queries the upload bucket for all the assay files, and compares them to the assay files in the dataset. For each discovered file, the auditor decides if the file needs to be deleted or ingested. The mappings detailed above are used to determine if a file can be deleted or ingested. + +For example, consider the results of the following query and the files we can see in the upload bucket. + +```python +import json +from metamist.audit import UploadBucketAuditor + +auditor = UploadBucketAuditor(dataset='test-dataset') +participant_data = auditor.get_participant_data_for_dataset() + +print(json.dumps(participant_data)) +``` + +```json +{ + [ + { + "id": 123, + "externalId": "Participant_1", + "samples": [ + { + "id": "XPG123456", + "externalId": "Sample_1", + "sequencingGroups": [ + { + "id": "CPG123456", + "type": "genome", + "assays": [ + { + "id": 1092, + "meta": { + "reads": [ + { + "location": "gs://cpg-test-dataset-upload/Sample_1_L001_R1.fastq.gz", + "size": 13990183007 + }, + { + "location": "gs://cpg-test-dataset-upload/Sample_1_L001_R2.fastq.gz", + "size": 14574318102 + } + ], + } + }, + ], + "analyses": [ + { + "id": 456, + "output": "gs://cpg-test-dataset-main/cram/CPG123456.cram", + "timestampCompleted": "2023-09-01T05:04:24" + } + ] + } + ] + } + ] + }, + { + "id": 124, + "externalId": "Participant_2", + "samples": [ + { + "id": "XPG123467", + "externalId": "Sample_2", + "sequencingGroups": [ + { + "id": "CPG123467", + "type": "genome", + "assays": [ + { + "id": 1093, + "meta": { + "reads": [ + { + "location": "gs://cpg-test-dataset-upload/Sample_2_L001_R2.fastq.gz", + "size": 13514368650 + }, + { + "location": "gs://cpg-test-dataset-upload/Sample_2_L001_R2.fastq.gz", + "size": 13834661895 + } + ], + } + }, + ], + "analyses": [] + } + ] + } + ] + } + ] +} +``` + +We can see that Participant_1 has a completed cram, and Participant_2 does not. In both cases the assay associated with each participant's sequencing group contains two read files. Next, look in the upload bucket to decide which files should be deleted and which should be ingested. + +```bash +$ gsutil ls gs://cpg-test-dataset-upload + +gs://cpg-test-dataset-upload/Sample_1_L001_R1.fastq.gz +gs://cpg-test-dataset-upload/Sample_1_L001_R2.fastq.gz +gs://cpg-test-dataset-upload/Sample_2_L001_R1.fastq.gz +gs://cpg-test-dataset-upload/Sample_2_L001_R2.fastq.gz + +$ gsutil ls gs://cpg-test-dataset-main/cram +gs://cpg-test-dataset-main/cram/CPG123456.cram +``` + +For the first two fastq files, we know they are associated with assay 1092, and that the cram for sequencing group CPG123456 has been completed and matches a file in the /cram folder of the main bucket. Therefore, we should delete these files. + +For the second two fastq files, we know they are associated with assay 1093, and that the cram for sequencing group CPG123467 has not been completed. Therefore, we should ingest these files. diff --git a/metamist/audit/audit_upload_bucket.py b/metamist/audit/audit_upload_bucket.py index d16d84eed..024f513d8 100644 --- a/metamist/audit/audit_upload_bucket.py +++ b/metamist/audit/audit_upload_bucket.py @@ -1,21 +1,9 @@ #!/usr/bin/env python """ -This auditor looks for sequence files in the main upload bucket that can be deleted or ingested, -as well as samples that have no completed cram. - -Procedure: -1. Input a Metamist dataset, sequence types to audit, and file types to search for in the bucket -2. Get all the participants, samples, sequencing groups, and assays for this dataset -3. Search the upload bucket for all assay files, and compare to the files in the metamist assay reads - - If there are discrepencies, check if the file name and size is the same in the bucket as it is in metamist - - If the name and file size are the same, assume this file has just been moved around in the bucket -4. Check if a sequencing group has a completed cram - if so, its assay read files can be removed -5. Any remaining assay data in the bucket might require ingestion -6. Create reports in the audit_results folder of the upload bucket, containing assay read files to delete, - ingest, and any sequencing groups without completed crams. +Report sequence files in the main-upload bucket that can be deleted, ingested, +and sequencing groups that have no aligned CRAM. """ - from datetime import datetime import sys import logging @@ -26,6 +14,8 @@ from cpg_utils.config import get_config from metamist.audit.generic_auditor import GenericAuditor +from metamist.graphql import query, gql + FASTQ_EXTENSIONS = ('.fq.gz', '.fastq.gz', '.fq', '.fastq') BAM_EXTENSIONS = ('.bam',) @@ -51,34 +41,35 @@ 'all': ALL_EXTENSIONS, } -SEQUENCING_TYPES_MAP = { - 'genome': [ - 'genome', - ], - 'exome': [ - 'exome', - ], - 'all': [ - 'genome', - 'exome', - ], -} + +def _get_sequencing_types(): + """Return the list of sequencing types from the enum table.""" + return query( # pylint: disable=unsubscriptable-object + gql(""" + query seqTypes { + enum { + sequencingType + } + } + """ + ) + )['enum']['sequencingType'] class UploadBucketAuditor(GenericAuditor): - """Auditor specifically for upload cloud storage buckets""" + """Auditor for upload cloud storage buckets""" def __init__( self, dataset: str, - sequencing_type: list[str], + sequencing_types: list[str], file_types: tuple[str], default_analysis_type='cram', default_analysis_status='completed', ): super().__init__( dataset=dataset, - sequencing_type=sequencing_type, + sequencing_types=sequencing_types, file_types=file_types, default_analysis_type=default_analysis_type, default_analysis_status=default_analysis_status, @@ -87,8 +78,8 @@ def __init__( def write_upload_bucket_audit_reports( self, bucket_name: str, - sequencing_type_str: str, - file_types_str: str, + sequencing_types: str, + file_types: str, assay_files_to_delete: list[tuple[str, int, str, list[int]]], assay_files_to_ingest: list[tuple[str, str, str, int, str]], unaligned_sgs: list[tuple[str, str]], @@ -102,12 +93,24 @@ def write_upload_bucket_audit_reports( """ today = datetime.today().strftime('%Y-%m-%d') - report_path = f'{bucket_name}/audit_results/{today}/' + report_path = f'gs://{bucket_name}/audit_results/{today}/' + + if sequencing_types == _get_sequencing_types(): + sequencing_types = 'all' + else: + ('_').join(sequencing_types) + + if file_types == ALL_EXTENSIONS: + file_types = 'all' + elif file_types == READ_EXTENSIONS: + file_types = 'all_reads' + else: + file_types = ('_').join(file_types) if not assay_files_to_delete: logging.info('No assay read files to delete found. Skipping report...') else: - assays_to_delete_file = f'{self.dataset}_{file_types_str}_{sequencing_type_str}_assay_files_to_delete_{today}.csv' + assays_to_delete_file = f'{self.dataset}_{file_types}_{sequencing_types}_assay_files_to_delete_{today}.csv' self.write_csv_report_to_cloud( data_to_write=assay_files_to_delete, report_path=os.path.join(report_path, assays_to_delete_file), @@ -115,7 +118,7 @@ def write_upload_bucket_audit_reports( 'SG_ID', 'Assay_ID', 'Assay_Read_File_Path', - 'Analysis_IDs', + 'CRAM_Analysis_ID', 'Filesize', ], ) @@ -124,7 +127,7 @@ def write_upload_bucket_audit_reports( if not assay_files_to_ingest: logging.info('No assay reads to ingest found. Skipping report...') else: - assays_to_ingest_file = f'{self.dataset}_{file_types_str}_{sequencing_type_str}_assay_files_to_ingest_{today}.csv' + assays_to_ingest_file = f'{self.dataset}_{file_types}_{sequencing_types}_assay_files_to_ingest_{today}.csv' self.write_csv_report_to_cloud( data_to_write=assay_files_to_ingest, report_path=os.path.join(report_path, assays_to_ingest_file), @@ -141,10 +144,10 @@ def write_upload_bucket_audit_reports( # Write the sequencing groups without any completed cram to a csv if not unaligned_sgs: logging.info( - f'No sequencing groups without crams found. Skipping report...' + 'No sequencing groups without crams found. Skipping report...' ) else: - unaligned_sgs_file = f'{self.dataset}_{file_types_str}_{sequencing_type_str}_unaligned_sgs_{today}.csv' + unaligned_sgs_file = f'{self.dataset}_{file_types}_{sequencing_types}_unaligned_sgs_{today}.csv' self.write_csv_report_to_cloud( data_to_write=unaligned_sgs, report_path=os.path.join(report_path, unaligned_sgs_file), @@ -152,38 +155,53 @@ def write_upload_bucket_audit_reports( ) -async def audit_upload_bucket_files( - dataset, sequencing_type, file_types, default_analysis_type, default_analysis_status +async def audit_upload_bucket( + dataset, sequencing_types, file_types, default_analysis_type, default_analysis_status, ): """ Finds sequence files for samples with completed CRAMs and adds these to a csv for deletion. Also finds any extra files in the upload bucket which may be uningested sequence data. Reports any files to delete, files to ingest, and samples without completed crams in output files + + Arguments: + dataset: The dataset to audit + sequencing_types: The sequencing types to audit + file_types: The file types to audit + default_analysis_type: The default analysis type to audit + default_analysis_status: The default analysis status to audit """ + + # Validate user inputs + if sequencing_types == ('all',): + sequencing_types = _get_sequencing_types() + else: + if any(st not in (allowed_sequencing_types := _get_sequencing_types()) for st in sequencing_types): + raise ValueError( + f'Input sequencing types "{sequencing_types}" must be in the allowed types: {allowed_sequencing_types}' + ) + + if file_types not in (('all',), ('all_reads',)): + if any(ft not in FILE_TYPES_MAP for ft in file_types): + raise ValueError(f'Input file types "{file_types}" must be in the allowed types {(", ").join(list(FILE_TYPES_MAP.keys()))}') + else: + file_types = FILE_TYPES_MAP[file_types[0]] + config = get_config() + bucket = config['storage']['default']['upload'] if not dataset: dataset = config['workflow']['dataset'] - bucket_name = config['storage']['default']['upload'] - bucket_name = f'gs://cpg-{dataset}-main-upload' - + # Initialise the auditor auditor = UploadBucketAuditor( dataset=dataset, - sequencing_type=SEQUENCING_TYPES_MAP.get(sequencing_type), - file_types=FILE_TYPES_MAP.get(file_types), + sequencing_types=sequencing_types, + file_types=file_types, default_analysis_type=default_analysis_type, default_analysis_status=default_analysis_status, ) - # Get all the participants and all the samples mapped to participants + # Get all the participants in the dataset and their participant_data = auditor.get_participant_data_for_dataset() - - # Get all the samples - sample_internal_external_id_map = auditor.map_internal_to_external_sample_ids( - participant_data - ) - - # Get all the sequences for the samples in the dataset and map them to their samples and reads ( sg_sample_id_map, assay_sg_id_map, @@ -197,6 +215,11 @@ async def audit_upload_bucket_files( sg_completion = auditor.get_complete_and_incomplete_sgs( assay_sg_id_map, sg_cram_paths ) + + # Find sequencing groups without completed crams + sample_internal_external_id_map = auditor.map_internal_to_external_sample_ids( + participant_data + ) unaligned_sgs = [ ( sg_id, @@ -207,11 +230,12 @@ async def audit_upload_bucket_files( ] # Samples with completed crams can have their sequences deleted - these are the obvious ones + ( reads_to_delete, reads_to_ingest, ) = await auditor.get_reads_to_delete_or_ingest( - bucket_name, + bucket, sg_completion.get('complete'), assay_filepaths_filesizes, sg_sample_id_map, @@ -224,9 +248,9 @@ async def audit_upload_bucket_files( ) auditor.write_upload_bucket_audit_reports( - bucket_name, - sequencing_type_str=sequencing_type, - file_types_str=file_types, + bucket, + sequencing_types=sequencing_types, + file_types=file_types, assay_files_to_delete=reads_to_delete, assay_files_to_ingest=possible_assay_ingests, unaligned_sgs=unaligned_sgs, @@ -240,31 +264,31 @@ async def audit_upload_bucket_files( help='Metamist dataset, used to filter samples', ) @click.option( - '--sequencing-type', + '--sequencing-types', '-s', - type=click.Choice(['genome', 'exome', 'all']), - required='True', - help='genome, exome, or all', + multiple=True, + required=True, + help='Sequencing types to filter on, use all to include all sequencing types', ) @click.option( '--file-types', '-f', - type=click.Choice(['fastq', 'bam', 'cram', 'all_reads', 'gvcf', 'vcf', 'all']), + multiple=True, required='True', help='Find fastq, bam, cram, gvcf, vcf, all_reads (fastq + bam + cram), or all sequence file types', ) def main( - dataset, - sequencing_type, - file_types, + dataset: str, + sequencing_types: tuple[str], + file_types: tuple[str], default_analysis_type='cram', default_analysis_status='completed', ): """Runs the auditor on the dataset""" asyncio.run( - audit_upload_bucket_files( + audit_upload_bucket( dataset, - sequencing_type, + sequencing_types, file_types, default_analysis_type, default_analysis_status, @@ -276,7 +300,7 @@ def main( logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s %(module)s:%(lineno)d - %(message)s', - datefmt='%Y-%M-%d %H:%M:%S', + datefmt='%Y-%m-%d %H:%M:%S', stream=sys.stderr, ) main() # pylint: disable=no-value-for-parameter diff --git a/metamist/audit/generic_auditor.py b/metamist/audit/generic_auditor.py index d39debf26..be8b8fec7 100644 --- a/metamist/audit/generic_auditor.py +++ b/metamist/audit/generic_auditor.py @@ -2,6 +2,7 @@ import logging import os from typing import Any +from datetime import datetime from gql.transport.requests import log as requests_logger from metamist.audit.audithelper import AuditHelper @@ -25,8 +26,8 @@ query DatasetData($datasetName: String!) { project(name: $datasetName) { participants { - externalId id + externalId samples { id externalId @@ -42,13 +43,14 @@ } } } - """ + """ ) QUERY_SG_ANALYSES = gql( """ - query sgAnalyses($sgId: String!, $analysisType: String!) { - sequencingGroups(id: {eq: $sgId}) { + query sgAnalyses($dataset: String!, $sgIds: [String!], $analysisType: String!) { + sequencingGroups(id: {in_: $sgIds}, project: {eq: $dataset}) { + id analyses(status: {eq: COMPLETED}, type: {eq: $analysisType}) { id meta @@ -57,7 +59,7 @@ } } } -""" + """ ) # Variable type definitions @@ -68,7 +70,7 @@ AssayReportEntry = namedtuple( 'AssayReportEntry', - 'sg_id assay_id assay_file_path analysis_ids filesize', + 'sg_id assay_id assay_file_path analysis_id filesize', ) @@ -79,7 +81,7 @@ class GenericAuditor(AuditHelper): def __init__( self, dataset: str, - sequencing_type: list[str], + sequencing_types: list[str], file_types: tuple[str], default_analysis_type='cram', default_analysis_status='completed', @@ -90,7 +92,7 @@ def __init__( super().__init__(search_paths=None) self.dataset = dataset - self.sequencing_type = sequencing_type + self.sequencing_types = sequencing_types self.file_types = file_types self.default_analysis_type: str = default_analysis_type self.default_analysis_status: str = default_analysis_status @@ -100,8 +102,7 @@ def __init__( def get_participant_data_for_dataset(self) -> list[dict]: """ Uses a graphQL query to return all participants in a Metamist dataset. - Nested in the return are all samples associated with the participants, - and then all the assays associated with those samples. + Returned list includes all samples and assays associated with the participants. """ participant_data = query( # pylint: disable=unsubscriptable-object @@ -119,41 +120,25 @@ def get_participant_data_for_dataset(self) -> list[dict]: return filtered_participants - # TODO - # Remove this method? @staticmethod - def map_participants_to_samples( - participants: list[dict], - ) -> dict[ParticipantExternalId, list[SampleId]]: + def get_most_recent_analyses_by_sg(analyses_list: list[dict[str, Any]]) -> dict[str, dict[str, Any]]: """ - Returns the {external_participant_id : sample_internal_id} - mapping for a Metamist dataset - - Also reports if any participants have more than one sample + Takes a list of completed analyses for a number of sequencing groups and returns the latest + completed analysis for each sequencing group, creating a 1:1 mapping of SG to analysis. """ + most_recent_analysis_by_sg = {} - # Create mapping of participant external id to sample id - participant_eid_sample_id_map = { - participant['externalId']: [ - sample['id'] for sample in participant['samples'] - ] - for participant in participants - } + for sg_analyses in analyses_list: + sg_id = sg_analyses['id'] + analyses = sg_analyses['analyses'] + if len(analyses) == 1: + most_recent_analysis_by_sg[sg_id] = analyses[0] + continue - # Create mapping of participant external id to internal id - participant_eid_to_iid = { - participant['externalId']: participant['id'] for participant in participants - } + sorted_analyses = sorted(analyses, key=lambda x: datetime.strptime(x['timestampCompleted'], '%Y-%m-%dT%H:%M:%S')) + most_recent_analysis_by_sg[sg_id] = sorted_analyses[-1] - # Report if any participants have more than one sample id - for participant_eid, sample_ids in participant_eid_sample_id_map.items(): - if len(sample_ids) > 1: - logging.info( - f'Participant {participant_eid_to_iid[participant_eid]} ' - f'(external id: {participant_eid}) associated with more than ' - f'one sample id: {sample_ids}' - ) - - return participant_eid_sample_id_map + return most_recent_analysis_by_sg @staticmethod def map_internal_to_external_sample_ids( @@ -176,7 +161,7 @@ def get_assay_map_from_participants( Returns the mappings: 1. { sg_id : sample_id } - 2. { assay_id : sg_id } + 2. { assay_id : sg_id } 3. { assay_id : (read_filepath, read_filesize,) } """ @@ -191,7 +176,7 @@ def get_assay_map_from_participants( } for sample_id, sgs in sample_sgs.items(): for sg in sgs: - if not sg['type'].lower() in self.sequencing_type: + if sg['type'].lower() not in self.sequencing_types: continue sg_sample_id_map[sg['id']] = sample_id @@ -204,6 +189,16 @@ def get_assay_map_from_participants( continue assay_sg_id_map[assay['id']] = sg['id'] + + if isinstance(reads, dict): + assay_filepaths_filesizes[assay['id']].append( + ( + reads.get('location'), + reads.get('size'), + ) + ) + continue + for read in reads: if not isinstance(read, dict): logging.error( @@ -229,24 +224,13 @@ def get_analysis_cram_paths_for_dataset_sgs( Returns a dict mapping {sg_id : (analysis_id, cram_path) } """ sg_ids = list(assay_sg_id_map.values()) + analyses = query(QUERY_SG_ANALYSES, {'dataset': self.dataset, 'sgId': sg_ids, 'analysisType': 'CRAM'})['sequencingGroups'] # pylint: disable=unsubscriptable-object + analyses = self.get_most_recent_analyses_by_sg(analyses_list=analyses) - analyses = [] - for sg_id in sg_ids: - analyses.extend( - [ - sg['analyses'] - for sg in query( # pylint: disable=unsubscriptable-object - QUERY_SG_ANALYSES, {'sgId': sg_id, 'analysisType': 'CRAM'} - )['sequencingGroups'] - ] - ) - analyses = [ - analysis for analyses_list in analyses for analysis in analyses_list - ] # Report any crams missing the sequencing type crams_with_missing_seq_type = [ analysis['id'] - for analysis in analyses + for analysis in list(analyses.values()) if 'sequencing_type' not in analysis['meta'] ] if crams_with_missing_seq_type: @@ -254,35 +238,17 @@ def get_analysis_cram_paths_for_dataset_sgs( f'{self.dataset} :: CRAM analyses are missing sequencing_type field: {crams_with_missing_seq_type}' ) - # Filter the analyses based on input sequencing type - analyses = [ - analysis - for analysis in analyses - if analysis['meta'].get('sequencing_type') in self.sequencing_type - ] - - # For each sg id, collect the analysis ids and cram paths + # For each sg id, collect the analysis id and cram paths sg_cram_paths: dict[str, dict[int, str]] = defaultdict(dict) - for analysis in analyses: - # Check the analysis output path is a valid gs path to a .cram file - if not analysis['output'].startswith('gs://') and analysis[ - 'output' - ].endswith('cram'): + for sg_id, analysis in analyses.items(): + cram_path = analysis['output'] + if not cram_path.startswith('gs://') and cram_path.endswith('cram'): logging.warning( f'Analysis {analysis["id"]} invalid output path: {analysis["output"]}' ) continue - try: - sg_id = self.get_sequencing_group_ids_from_analysis(analysis)[0] - sg_cram_paths[sg_id].update( - [(analysis.get('id'), analysis.get('output'))] - ) - except ValueError: - logging.warning( - f'Analysis {analysis["id"]} missing sample or sequencing group field.' - ) - continue + sg_cram_paths[sg_id].update([(analysis['id'], analysis['output'])]) return sg_cram_paths @@ -291,35 +257,25 @@ def analyses_for_sgs_without_crams(self, sgs_without_crams: list[str]): all_sg_analyses: dict[str, list[dict[str, int | str]]] = defaultdict(list) - analyses = [] + sg_analyses = [] for analysis_type in ANALYSIS_TYPES: if analysis_type == 'CRAM': continue - for sg in sgs_without_crams: - analyses.extend( - [ - sg['analyses'] - for sg in query( # pylint: disable=unsubscriptable-object + sg_analyses.extend( + [ + list( + query( # pylint: disable=unsubscriptable-object QUERY_SG_ANALYSES, - {'sgId': sg, 'analysisType': analysis_type}, + {'dataset': self.dataset, 'sgIds': sgs_without_crams, 'analysisType': analysis_type}, )['sequencingGroups'] - ] - ) - analyses = [ - analysis for analyses_list in analyses for analysis in analyses_list - ] - for analysis in analyses: - try: - sg_ids = self.get_sequencing_group_ids_from_analysis(analysis) - except ValueError: - logging.warning( - f'Analysis {analysis["id"]} missing sample or sequencing group field.' - ) - continue + ) + ] + ) - for sg_id in sg_ids: - if sg_id not in sgs_without_crams: - continue + for sg_analysis in sg_analyses: + sg_id = sg_analysis['id'] + + for analysis in sg_analysis['analyses']: analysis_entry = { 'analysis_id': analysis['id'], 'analysis_type': analysis['type'], @@ -351,24 +307,24 @@ def get_complete_and_incomplete_sgs( for analyses in sg_cram_paths.values(): cram_paths.update(list(analyses.values())) - # Check the analysis paths actually point to crams that exist in the bucket + # Check the analysis CRAM paths actually exist in the bucket buckets_subdirs = self.get_gcs_bucket_subdirs_to_search(list(cram_paths)) crams_in_bucket = self.find_files_in_gcs_buckets_subdirs( buckets_subdirs, ('cram',), ) - # Incomplete samples initialised as the list of samples without a valid analysis cram output path + # Incomplete SGs initialised as the SGs without a completed CRAM incomplete_sgs = set(assay_sg_id_map.values()).difference( set(sg_cram_paths.keys()) ) - # Completed sgs are those whose completed cram output path exists in the bucket at the same path + # Completed SGs have a CRAM file in the bucket that matches the path in Metamist completed_sgs = defaultdict(list) - for sg_id, analyses in sg_cram_paths.items(): - for analysis_id, cram_path in analyses.items(): + for sg_id, analysis in sg_cram_paths.items(): + for analysis_id, cram_path in analysis.items(): if cram_path in crams_in_bucket: - completed_sgs[sg_id].append(analysis_id) + completed_sgs[sg_id] = analysis_id continue incomplete_sgs.update(sg_id) @@ -390,10 +346,13 @@ async def check_for_uningested_or_moved_assays( # pylint: disable=R0914 sample_internal_external_id_map: dict[str, str], ): """ - Compares the assays in a Metamist dataset to the assays in the main-upload bucket + Compares the assays read files in a Metamist dataset to the read files found in the + main-upload bucket. + + Input: The upload bucket name, the {assay_id : (read_path, read_size)} mapping, + the completed SGs, the { SG_ID : sample_ID } mapping, the { assay_id : SG_ID } mapping, + and the { sample_id : sample_external_id } mapping. - Input: The dataset name, the {assay_id : read_paths} mapping, the assay file types, - and the sample internal -> external id mapping. Returns: 1. Paths to assays that have not yet been ingested - checks if any completed sample external IDs are in the filename and includes this in output. 2. A dict mapping assay IDs to GS filepaths that have been ingested, @@ -401,7 +360,7 @@ async def check_for_uningested_or_moved_assays( # pylint: disable=R0914 assay in Metamist. If the filenames and file sizes match, these are identified as assay files that have been moved from their original location to a new location. - 3. The paths saved in metamist for the read data that has been moved. These paths go nowhere. + 3. The assay read files that have been deleted/moved. """ # Get all the paths to assay data anywhere in the main-upload bucket assay_paths_in_bucket = self.find_assay_files_in_gcs_bucket( @@ -491,15 +450,15 @@ async def check_for_uningested_or_moved_assays( # pylint: disable=R0914 for bucket_path, metamist_path in ingested_and_moved_filepaths: assay_id = reads_assays.get(metamist_path) sg_id = assay_sg_id_map.get(assay_id) - analysis_ids = completed_sgs.get(sg_id) - if sg_id not in AuditHelper.EXCLUDED_SGS and analysis_ids: + analysis_id = completed_sgs.get(sg_id) + if sg_id not in AuditHelper.EXCLUDED_SGS and analysis_id: filesize = new_assay_path_sizes[bucket_path] assays_moved_paths.append( AssayReportEntry( sg_id=sg_id, assay_id=assay_id, assay_file_path=bucket_path, - analysis_ids=analysis_ids, + analysis_id=analysis_id, filesize=filesize, ) ) @@ -544,7 +503,7 @@ async def get_reads_to_delete_or_ingest( sg_assays_id_map[sg_id].append(assay_id) assay_reads_to_delete = [] - for sg_id, analysis_ids in completed_sgs.items(): + for sg_id, analysis_id in completed_sgs.items(): if sg_id in AuditHelper.EXCLUDED_SGS: continue assay_ids = sg_assays_id_map[sg_id] @@ -559,7 +518,7 @@ async def get_reads_to_delete_or_ingest( sg_id=sg_id, assay_id=assay_id, assay_file_path=path, - analysis_ids=analysis_ids, + analysis_id=analysis_id, filesize=filesize, ) )