Skip to content

Commit

Permalink
Update query functions with asyncio. Use asyncio entrypoints in auditor
Browse files Browse the repository at this point in the history
  • Loading branch information
EddieLF committed Sep 25, 2023
1 parent 0d596ae commit 651f98e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 17 deletions.
33 changes: 20 additions & 13 deletions metamist/audit/audit_upload_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,26 @@
"""
)


def get_sequencing_types():
"""Entrypoint for getting sequencing types asynchronously."""
return asyncio.get_event_loop().run_until_complete(_get_sequencing_types())


async def _get_sequencing_types():
"""Return the list of sequencing types from the enum table."""
return await query_async( # pylint: disable=unsubscriptable-object
sequencing_types = await query_async( # pylint: disable=unsubscriptable-object
SEQUENCING_TYPES_QUERY
)['enum']['sequencingType']
)
return sequencing_types['enum']['sequencingType']


def audit_upload_bucket(
dataset: str, sequencing_types: list[str], file_types: list[str], default_analysis_type: str, default_analysis_status: str,
):
"""Entrypoint for running upload bucket auditor asynchronously."""
asyncio.get_event_loop().run_until_complete(
audit_upload_bucket(
audit_upload_bucket_async(
dataset,
sequencing_types,
file_types,
Expand Down Expand Up @@ -92,7 +99,7 @@ def __init__(
default_analysis_status=default_analysis_status,
)

def write_upload_bucket_audit_reports(
async def write_upload_bucket_audit_reports(
self,
bucket_name: str,
sequencing_types: list[str],
Expand All @@ -112,7 +119,7 @@ def write_upload_bucket_audit_reports(

report_path = f'gs://{bucket_name}/audit_results/{today}/'

if set(sequencing_types) == set(_get_sequencing_types()):
if set(sequencing_types) == set(await _get_sequencing_types()):
sequencing_types_str = 'all'
else:
sequencing_types_str = ('_').join(sequencing_types)
Expand All @@ -123,7 +130,7 @@ def write_upload_bucket_audit_reports(
file_types_str = 'all_reads'
else:
file_types_str = ('_').join(file_types)

report_prefix = f'{self.dataset}_{file_types_str}_{sequencing_types_str}'

if not assay_files_to_delete:
Expand Down Expand Up @@ -192,9 +199,9 @@ async def audit_upload_bucket_async(

# Validate user inputs
if sequencing_types == ('all',):
sequencing_types = _get_sequencing_types()
sequencing_types = await _get_sequencing_types()
else:
if any(st not in (allowed_sequencing_types := _get_sequencing_types()) for st in sequencing_types):
if any(st not in (allowed_sequencing_types := await _get_sequencing_types()) for st in sequencing_types):
raise ValueError(
f'Input sequencing types "{sequencing_types}" must be in the allowed types: {allowed_sequencing_types}'
)
Expand All @@ -209,7 +216,7 @@ async def audit_upload_bucket_async(
if not dataset:
dataset = config['workflow']['dataset']
bucket = config['storage'][dataset]['upload']

# Initialise the auditor
auditor = UploadBucketAuditor(
dataset=dataset,
Expand All @@ -219,7 +226,7 @@ async def audit_upload_bucket_async(
default_analysis_status=default_analysis_status,
)

participant_data = auditor.get_participant_data_for_dataset()
participant_data = await auditor.get_participant_data_for_dataset()
sample_internal_external_id_map = auditor.map_internal_to_external_sample_ids(
participant_data
)
Expand All @@ -230,7 +237,7 @@ async def audit_upload_bucket_async(
) = auditor.get_assay_map_from_participants(participant_data)

# Get all completed cram output paths for the samples in the dataset and validate them
sg_cram_paths = auditor.get_analysis_cram_paths_for_dataset_sgs(assay_sg_id_map)
sg_cram_paths = await auditor.get_analysis_cram_paths_for_dataset_sgs(assay_sg_id_map)

# Identify sgs with and without completed crams
sg_completion = auditor.get_complete_and_incomplete_sgs(
Expand Down Expand Up @@ -263,7 +270,7 @@ async def audit_upload_bucket_async(
)

# Write the reads to delete, reads to ingest, and unaligned SGs reports
auditor.write_upload_bucket_audit_reports(
await auditor.write_upload_bucket_audit_reports(
bucket,
sequencing_types=sequencing_types,
file_types=file_types,
Expand All @@ -284,7 +291,7 @@ async def audit_upload_bucket_async(
'-s',
multiple=True,
required=True,
help='Sequencing types to filter on, use all to include all sequencing types',
help=f'"all", or any of {", ".join(get_sequencing_types())}',
)
@click.option(
'--file-types',
Expand Down
10 changes: 6 additions & 4 deletions metamist/audit/generic_auditor.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,10 @@ async def get_participant_data_for_dataset(self) -> list[dict]:
Returned list includes all samples and assays associated with the participants.
"""

participant_data = await query_async( # pylint: disable=unsubscriptable-object
participant_query_results = await query_async(
QUERY_PARTICIPANTS_SAMPLES_SGS_ASSAYS, {'datasetName': self.dataset}
)['project']['participants']
)
participant_data = participant_query_results['project']['participants']

filtered_participants = []
for participant in participant_data:
Expand Down Expand Up @@ -188,7 +189,7 @@ def get_assay_map_from_participants(
)
continue

if assay_sg_id_map[assay['id']]:
if assay_sg_id_map.get(assay['id']):
raise ValueError(
f'{self.dataset} :: Assay {assay["id"]} has multiple SGs: {assay_sg_id_map[assay["id"]]} and {sg["id"]}'
)
Expand Down Expand Up @@ -228,7 +229,8 @@ async def get_analysis_cram_paths_for_dataset_sgs(
Returns a dict mapping {sg_id : (analysis_id, cram_path) }
"""
sg_ids = list(assay_sg_id_map.values())
analyses = await query_async(QUERY_SG_ANALYSES, {'dataset': self.dataset, 'sgId': sg_ids, 'analysisTypes': ['CRAM']})['sequencingGroups'] # pylint: disable=unsubscriptable-object
analyses_query_result = await query_async(QUERY_SG_ANALYSES, {'dataset': self.dataset, 'sgId': sg_ids, 'analysisTypes': ['CRAM']})
analyses = analyses_query_result['sequencingGroups']
analyses = self.get_most_recent_analyses_by_sg(analyses_list=analyses)

# Report any crams missing the sequencing type
Expand Down

0 comments on commit 651f98e

Please sign in to comment.