Skip to content

Commit

Permalink
Add asyncio wait for 60 second maximum, silence excessive logs
Browse files Browse the repository at this point in the history
  • Loading branch information
EddieLF committed Sep 25, 2023
1 parent e566eb2 commit c8c654b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 8 deletions.
17 changes: 11 additions & 6 deletions metamist/audit/audit_upload_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ def get_sequencing_types():

async def _get_sequencing_types():
"""Return the list of sequencing types from the enum table."""
logging.getLogger().setLevel(logging.WARN)
sequencing_types = await query_async( # pylint: disable=unsubscriptable-object
SEQUENCING_TYPES_QUERY
)
logging.getLogger().setLevel(logging.INFO)
return sequencing_types['enum']['sequencingType']


Expand All @@ -70,12 +72,15 @@ def audit_upload_bucket(
):
"""Entrypoint for running upload bucket auditor asynchronously."""
asyncio.get_event_loop().run_until_complete(
audit_upload_bucket_async(
dataset,
sequencing_types,
file_types,
default_analysis_type,
default_analysis_status,
asyncio.wait_for(
audit_upload_bucket_async(
dataset,
sequencing_types,
file_types,
default_analysis_type,
default_analysis_status,
),
timeout=60
)
)

Expand Down
20 changes: 18 additions & 2 deletions metamist/audit/generic_auditor.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
id
meta
output
type
timestampCompleted
}
}
Expand Down Expand Up @@ -105,9 +106,12 @@ async def get_participant_data_for_dataset(self) -> list[dict]:
Returned list includes all samples and assays associated with the participants.
"""

logging.getLogger().setLevel(logging.WARN)
participant_query_results = await query_async(
QUERY_PARTICIPANTS_SAMPLES_SGS_ASSAYS, {'datasetName': self.dataset}
)
logging.getLogger().setLevel(logging.INFO)

participant_data = participant_query_results['project']['participants']

filtered_participants = []
Expand All @@ -132,6 +136,8 @@ def get_most_recent_analyses_by_sg(analyses_list: list[dict[str, Any]]) -> dict[
for sg_analyses in analyses_list:
sg_id = sg_analyses['id']
analyses = sg_analyses['analyses']
if not analyses:
continue
if len(analyses) == 1:
most_recent_analysis_by_sg[sg_id] = analyses[0]
continue
Expand Down Expand Up @@ -229,7 +235,11 @@ async def get_analysis_cram_paths_for_dataset_sgs(
Returns a dict mapping {sg_id : (analysis_id, cram_path) }
"""
sg_ids = list(assay_sg_id_map.values())

logging.getLogger().setLevel(logging.WARN)
analyses_query_result = await query_async(QUERY_SG_ANALYSES, {'dataset': self.dataset, 'sgId': sg_ids, 'analysisTypes': ['CRAM']})
logging.getLogger().setLevel(logging.INFO)

analyses = analyses_query_result['sequencingGroups']
analyses = self.get_most_recent_analyses_by_sg(analyses_list=analyses)

Expand Down Expand Up @@ -263,15 +273,21 @@ async def analyses_for_sgs_without_crams(self, sgs_without_crams: list[str]):

all_sg_analyses: dict[str, list[dict[str, int | str]]] = defaultdict(list)

sg_analyses = await query_async( # pylint: disable=unsubscriptable-object
logging.getLogger().setLevel(logging.WARN)
sg_analyse_query_result = await query_async(
QUERY_SG_ANALYSES,
{'dataset': self.dataset, 'sgIds': sgs_without_crams, 'analysisTypes': [t for t in ANALYSIS_TYPES if t != 'CRAM']},
)['sequencingGroups']
)
logging.getLogger().setLevel(logging.INFO)

sg_analyses = sg_analyse_query_result['sequencingGroups']

for sg_analysis in sg_analyses:
sg_id = sg_analysis['id']

for analysis in sg_analysis['analyses']:
if not analysis.get('type'):
print(sg_id, analysis)
analysis_entry = {
'analysis_id': analysis['id'],
'analysis_type': analysis['type'],
Expand Down

0 comments on commit c8c654b

Please sign in to comment.