Skip to content

Commit

Permalink
Use asyncio gather to execute updates (#713)
Browse files Browse the repository at this point in the history
  • Loading branch information
EddieLF authored Mar 20, 2024
1 parent 032900e commit ef79c7f
Showing 1 changed file with 42 additions and 22 deletions.
64 changes: 42 additions & 22 deletions scripts/back_populate_library_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
updates the assay.meta field with `facility` and `library_type` annotations.
Also updates the sequencing group meta fields with the same information, if the flag is set.
"""
import asyncio
import logging
import re
from collections import defaultdict
Expand All @@ -13,7 +14,7 @@

from metamist.apis import AssayApi, SequencingGroupApi
from metamist.graphql import gql, query
from metamist.models import AssayUpsert
from metamist.models import AssayUpsert, SequencingGroupMetaUpdateModel

logger = logging.getLogger(__file__)
logging.basicConfig(
Expand Down Expand Up @@ -123,6 +124,36 @@ def check_assay_meta_fields(assays: list[dict], update_sequencing_groups: bool):
return assay_meta


async def update_assays_async(assays_to_update: list[dict]):
"""Update assays with new meta fields."""
asapi = AssayApi()
updates = []
for assay in assays_to_update:
for assay_id, assay_meta_fields_to_update in assay.items():
if assay_meta_fields_to_update:
updates.append(
asapi.update_assay_async(AssayUpsert(id=assay_id, meta=assay_meta_fields_to_update),)
)
return await asyncio.gather(*updates)


async def update_sequencing_groups_async(sequencing_groups_to_update: list[dict], project: str):
"""Update sequencing groups with new meta fields."""
sgapi = SequencingGroupApi()
updates = []
for sequencing_group in sequencing_groups_to_update:
for sequencing_group_id, sg_meta_fields_to_update in sequencing_group.items():
if sg_meta_fields_to_update:
updates.append(
sgapi.update_sequencing_group_async(
sequencing_group_id=sequencing_group_id,
project=project,
sequencing_group_meta_update_model=SequencingGroupMetaUpdateModel(meta=sg_meta_fields_to_update)
)
)
return await asyncio.gather(*updates)


@click.command()
@click.option(
'-p',
Expand Down Expand Up @@ -152,9 +183,6 @@ def check_assay_meta_fields(assays: list[dict], update_sequencing_groups: bool):
)
def main(project: str, sequencing_type: str, update_sequencing_groups: bool, dry_run: bool):
"""Back populate facility and library_type meta fields for existing assays and sequencing groups."""
asapi = AssayApi()
sgapi = SequencingGroupApi()

sg_assays: defaultdict[str, list[dict]] = defaultdict(list)
sg_meta: dict[str, dict] = {}

Expand All @@ -167,8 +195,8 @@ def main(project: str, sequencing_type: str, update_sequencing_groups: bool, dry
sg_assays[sg_id].extend(assays)

# For logs
updated_assays: list[dict[str, dict]] = []
updated_sequencing_groups: list[dict[str, dict]] = []
assays_to_update: list[dict[str, dict]] = []
sequencing_groups_to_update: list[dict[str, dict]] = []

for sequencing_group, assays in sg_assays.items():
current_sg_facility = sg_meta[sequencing_group].get('facility')
Expand All @@ -187,34 +215,26 @@ def main(project: str, sequencing_type: str, update_sequencing_groups: bool, dry
sg_meta_fields_to_update['facility'] = assay_meta_fields_to_update['facility']
if library_type := assay_meta_fields_to_update.get('library_type'):
sg_meta_fields_to_update['library_type'] = library_type
if not dry_run:
asapi.update_assay(
AssayUpsert(id=assay_id, meta=assay_meta_fields_to_update),
)
updated_assays.append({assay_id: assay_meta_fields_to_update})
assays_to_update.append({assay_id: assay_meta_fields_to_update})

if sg_meta_fields_to_update and update_sequencing_groups:
if not dry_run:
sgapi.update_sequencing_group(
sequencing_group_id=sequencing_group,
project=project,
sequencing_group_meta_update_model=sg_meta_fields_to_update
)
updated_sequencing_groups.append({sequencing_group: sg_meta_fields_to_update})
sequencing_groups_to_update.append({sequencing_group: sg_meta_fields_to_update})

if dry_run:
logging.info(
f'Dummy run. Would have updated {len(updated_assays)} assays. {updated_assays}'
f'Dummy run. Would have updated {len(assays_to_update)} assays.\n\n{assays_to_update}'
)
if update_sequencing_groups:
logging.info(
f'Dummy run. Would have updated {len(updated_sequencing_groups)} sequencing groups. {updated_sequencing_groups}'
f'\n\nDummy run. Would have updated {len(sequencing_groups_to_update)} sequencing groups.\n\n{sequencing_groups_to_update}'
)
else:
logging.info(f'Updated {len(updated_assays)} sequences. {updated_assays}')
asyncio.run(update_assays_async(assays_to_update))
logging.info(f'Updated {len(assays_to_update)} assays.\n\n{assays_to_update}')
if update_sequencing_groups:
asyncio.run(update_sequencing_groups_async(sequencing_groups_to_update, project))
logging.info(
f'Updated {len(updated_sequencing_groups)} sequencing groups. {updated_sequencing_groups}'
f'\n\nUpdated {len(sequencing_groups_to_update)} sequencing groups.\n\n{sequencing_groups_to_update}'
)


Expand Down

0 comments on commit ef79c7f

Please sign in to comment.