Skip to content

Commit

Permalink
Rna ingestion updates (#676)
Browse files Browse the repository at this point in the history
* Updating parsers with library type etc

* Add new sample map column defaults, update sg meta on RNA ingest

* Update parser column default, update rna sg meta, update parsing prints

* Change reference to 'sequence' to 'assay'

* Change reference to 'library_type' to 'sequencing_library'

* Change another reference to 'library_type' to 'sequencing_library'

* Add required fields for exomes, more tidying, use tabulate

* Add tabulate to requirements

* Fix parse_manifest return for tests, update script referencing group_assays fn

* Remove required exome fields, re-add default analysis status 'completed'

* Update parser testing with RNA examples

* Remove added external_family_id from ParsedParticipant class

* Refactor parser to use SequencingDefualts class

* Collapse sequencing defaults into new class

* Update setup.py and resolve merge conflicts

* isort linting

* Fix typehint for read and ref files function return
  • Loading branch information
EddieLF authored Feb 16, 2024
1 parent 08795fa commit 9199dab
Show file tree
Hide file tree
Showing 10 changed files with 569 additions and 179 deletions.
209 changes: 145 additions & 64 deletions metamist/parser/generic_metadata_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
import re
import shlex
from functools import reduce
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Set, Tuple, Union

import click

from metamist.parser.generic_parser import ( # noqa
DefaultSequencing,
GenericParser,
GroupedRow,
ParsedAnalysis,
ParsedAssay,
ParsedSample,
ParsedSequencingGroup,
SingleRow,
run_as_sync,
Expand Down Expand Up @@ -92,27 +94,33 @@ def __init__(
seq_type_column: Optional[str] = None,
seq_technology_column: Optional[str] = None,
seq_platform_column: Optional[str] = None,
seq_facility_column: Optional[str] = None,
seq_library_column: Optional[str] = None,
read_end_type_column: Optional[str] = None,
read_length_column: Optional[str] = None,
gvcf_column: Optional[str] = None,
meta_column: Optional[str] = None,
seq_meta_column: Optional[str] = None,
batch_number: Optional[str] = None,
reference_assembly_location_column: Optional[str] = None,
default_reference_assembly_location: Optional[str] = None,
default_sequencing_type='genome',
default_sample_type=None,
default_sequencing_technology='short-read',
default_sequencing_platform='illumina',
default_sequencing=DefaultSequencing(
seq_type='genome', technology='short-read', platform='illumina'
),
default_read_end_type: Optional[str] = None,
default_read_length: Optional[str | int] = None,
allow_extra_files_in_search_path=False,
**kwargs,
):
super().__init__(
path_prefix=None,
search_paths=search_locations,
project=project,
default_sequencing_type=default_sequencing_type,
default_sample_type=default_sample_type,
default_sequencing_technology=default_sequencing_technology,
default_sequencing_platform=default_sequencing_platform,
default_sequencing=default_sequencing,
default_read_end_type=default_read_end_type,
default_read_length=default_read_length,
**kwargs,
)

Expand All @@ -130,6 +138,10 @@ def __init__(
self.seq_type_column = seq_type_column
self.seq_technology_column = seq_technology_column
self.seq_platform_column = seq_platform_column
self.seq_facility_column = seq_facility_column
self.seq_library_column = seq_library_column
self.read_end_type_column = read_end_type_column
self.read_length_column = read_length_column
self.reference_assembly_location_column = reference_assembly_location_column
self.default_reference_assembly_location = default_reference_assembly_location

Expand Down Expand Up @@ -168,14 +180,14 @@ def get_sequencing_types(self, row: GroupedRow) -> list[str]:
if isinstance(row, dict):
return [self.get_sequencing_type(row)]
return [
str(r.get(self.seq_type_column, self.default_sequencing_type)) for r in row
str(r.get(self.seq_type_column, self.default_sequencing.seq_type)) for r in row
]

def get_sequencing_technology(self, row: SingleRow) -> str:
"""Get assay technology for single row"""
value = (
row.get(self.seq_technology_column, None)
or self.default_sequencing_technology
or self.default_sequencing.technology
)
value = value.lower()

Expand All @@ -187,15 +199,15 @@ def get_sequencing_technology(self, row: SingleRow) -> str:
def get_sequencing_platform(self, row: SingleRow) -> str:
"""Get sequencing platform for single row"""
value = (
row.get(self.seq_platform_column, None) or self.default_sequencing_platform
row.get(self.seq_platform_column, None) or self.default_sequencing.platform
)
value = value.lower()

return str(value)

def get_sequencing_type(self, row: SingleRow) -> str:
"""Get assay type from row"""
value = row.get(self.seq_type_column, None) or self.default_sequencing_type
value = row.get(self.seq_type_column, None) or self.default_sequencing.seq_type
value = value.lower()

if value == 'wgs':
Expand All @@ -204,9 +216,47 @@ def get_sequencing_type(self, row: SingleRow) -> str:
value = 'exome'
elif 'mt' in value:
value = 'mtseq'
elif 'polya' in value or 'mrna' in value:
value = 'polyarna'
elif 'total' in value:
value = 'totalrna'
elif 'single' in value:
value = 'singlecellrna'

return str(value)

def get_sequencing_facility(self, row: SingleRow) -> str:
"""Get sequencing facility from row"""
value = (
row.get(self.seq_facility_column, None) or self.default_sequencing.facility
)
value = str(value) if value else None
return value

def get_sequencing_library(self, row: SingleRow) -> str:
"""Get sequencing library from row"""
value = (
row.get(self.seq_library_column, None) or self.default_sequencing.library
)
value = str(value) if value else None
return value

def get_read_end_type(self, row: SingleRow) -> str:
"""Get read end type from row"""
value = (
row.get(self.read_end_type_column, None) or self.default_read_end_type
)
value = str(value).lower() if value else None
return value

def get_read_length(self, row: SingleRow) -> int:
"""Get read length from row"""
value = (
row.get(self.read_length_column, None) or self.default_read_length
)
value = int(value) if value else None
return value

def get_assay_id(self, row: GroupedRow) -> Optional[dict[str, str]]:
"""Get external assay ID from row. Needs to be implemented per parser.
NOTE: To be re-thought after assay group changes are applied"""
Expand Down Expand Up @@ -515,6 +565,7 @@ async def get_participant_meta_from_group(self, rows: GroupedRow):
async def get_sequencing_group_meta(
self, sequencing_group: ParsedSequencingGroup
) -> dict:
"""Get sequencing group metadata from variant (vcf) files"""
meta: dict[str, Any] = {}

if not sequencing_group.sample.external_sid:
Expand Down Expand Up @@ -552,6 +603,55 @@ async def get_sequencing_group_meta(

return meta

async def get_read_and_ref_files_and_checksums(self, sample_id: str, rows: GroupedRow) -> (
Tuple[List[str], List[str], Set[str]]):
"""Get read filenames and checksums from rows."""
read_filenames: List[str] = []
read_checksums: List[str] = []
reference_assemblies: set[str] = set()
for r in rows:
_rfilenames = await self.get_read_filenames(sample_id=sample_id, row=r)
read_filenames.extend(_rfilenames)
if self.checksum_column and self.checksum_column in r:
checksums = await self.get_checksums_from_row(sample_id, r, _rfilenames)
if not checksums:
checksums = [None] * len(_rfilenames)
read_checksums.extend(checksums)
if self.reference_assembly_location_column:
ref = r.get(self.reference_assembly_location_column)
if ref:
reference_assemblies.add(ref)
return read_filenames, read_checksums, reference_assemblies

async def parse_cram_assays(self, sample: ParsedSample, reference_assemblies: set[str]) -> dict[str, Any]:
"""Parse CRAM assays"""
if len(reference_assemblies) > 1:
# sorted for consistent testing
str_ref_assemblies = ', '.join(sorted(reference_assemblies))
raise ValueError(
f'Multiple reference assemblies were defined for {sample.external_sid}: {str_ref_assemblies}'
)
if len(reference_assemblies) == 1:
ref = next(iter(reference_assemblies))
else:
ref = self.default_reference_assembly_location
if not ref:
raise ValueError(
f'Reads type for {sample.external_sid!r} is CRAM, but a reference '
f'is not defined, please set the default reference assembly path'
)

ref_fp = self.file_path(ref)
secondary_files = (
await self.create_secondary_file_objects_by_potential_pattern(
ref_fp, ['.fai']
)
)
cram_reference = await self.create_file_object(
ref_fp, secondary_files=secondary_files
)
return {'reference_assembly' : cram_reference}

async def get_assays_from_group(
self, sequencing_group: ParsedSequencingGroup
) -> list[ParsedAssay]:
Expand All @@ -566,27 +666,9 @@ async def get_assays_from_group(

assays = []

read_filenames: List[str] = []
read_checksums: List[str] = []
reference_assemblies: set[str] = set()

for r in rows:
_rfilenames = await self.get_read_filenames(
sample_id=sample.external_sid, row=r
)
read_filenames.extend(_rfilenames)
if self.checksum_column and self.checksum_column in r:
checksums = await self.get_checksums_from_row(
sample.external_sid, r, _rfilenames
)
if not checksums:
checksums = [None] * len(_rfilenames)
read_checksums.extend(checksums)

if self.reference_assembly_location_column:
ref = r.get(self.reference_assembly_location_column)
if ref:
reference_assemblies.add(ref)
read_filenames, read_checksums, reference_assemblies = await self.get_read_and_ref_files_and_checksums(
sample.external_sid, rows
)

# strip in case collaborator put "file1, file2"
full_read_filenames: List[str] = []
Expand Down Expand Up @@ -614,37 +696,32 @@ async def get_assays_from_group(
# collapsed_assay_meta['reads'] = reads[reads_type]

if reads_type == 'cram':
if len(reference_assemblies) > 1:
# sorted for consistent testing
str_ref_assemblies = ', '.join(sorted(reference_assemblies))
raise ValueError(
f'Multiple reference assemblies were defined for {sample.external_sid}: {str_ref_assemblies}'
)
if len(reference_assemblies) == 1:
ref = next(iter(reference_assemblies))
else:
ref = self.default_reference_assembly_location

if not ref:
raise ValueError(
f'Reads type for {sample.external_sid!r} is CRAM, but a reference '
f'is not defined, please set the default reference assembly path'
)

ref_fp = self.file_path(ref)
secondary_files = (
await self.create_secondary_file_objects_by_potential_pattern(
ref_fp, ['.fai']
)
collapsed_assay_meta.update(
await self.parse_cram_assays(sample, reference_assemblies)
)
cram_reference = await self.create_file_object(
ref_fp, secondary_files=secondary_files
)
collapsed_assay_meta['reference_assembly'] = cram_reference

if self.batch_number is not None:
collapsed_assay_meta['batch'] = self.batch_number

if sequencing_group.sequencing_type in ['exome', 'polyarna', 'totalrna', 'singlecellrna']:
rows = sequencing_group.rows
# Exome / RNA should have facility and library, allow missing for exomes - for now
if self.get_sequencing_facility(rows[0]):
collapsed_assay_meta['sequencing_facility'] = self.get_sequencing_facility(rows[0])
if self.get_sequencing_library(rows[0]):
collapsed_assay_meta['sequencing_library'] = self.get_sequencing_library(rows[0])
# RNA requires read end type and length as well
if sequencing_group.sequencing_type != 'exome':
collapsed_assay_meta['read_end_type'] = self.get_read_end_type(rows[0])
collapsed_assay_meta['read_length'] = self.get_read_length(rows[0])
# if any of the above fields are not set for an RNA assay, raise an error
if not all(collapsed_assay_meta.values()):
raise ValueError(
f'Not all required fields were set for RNA sample {sample.external_sid}:\n'
f'{collapsed_assay_meta}\n'
f'Use the default value arguments if they are not present in the manifest.'
)

for read in reads[reads_type]:
assays.append(
ParsedAssay(
Expand All @@ -653,8 +730,8 @@ async def get_assays_from_group(
# as we grab all reads, and then determine assay
# grouping from there.
rows=sequencing_group.rows,
internal_seq_id=None,
external_seq_ids={},
internal_assay_id=None,
external_assay_ids={},
# unfortunately hard to break them up by row in the current format
# assay_status=self.get_assay_status(rows),
assay_type='sequencing',
Expand All @@ -667,6 +744,8 @@ async def get_assays_from_group(
},
)
)
if not sequencing_group.meta:
sequencing_group.meta = self.get_sequencing_group_meta_from_assays(assays)

return assays

Expand Down Expand Up @@ -803,9 +882,9 @@ async def main(
if not manifests:
raise ValueError('Expected at least 1 manifest')

extra_seach_paths = [m for m in manifests if m.startswith('gs://')]
if extra_seach_paths:
search_path = list(set(search_path).union(set(extra_seach_paths)))
extra_search_paths = [m for m in manifests if m.startswith('gs://')]
if extra_search_paths:
search_path = list(set(search_path).union(set(extra_search_paths)))

participant_meta_map: Dict[Any, Any] = {}
sample_meta_map: Dict[Any, Any] = {}
Expand Down Expand Up @@ -839,7 +918,9 @@ async def main(
reported_gender_column=reported_gender_column,
karyotype_column=karyotype_column,
default_sample_type=default_sample_type,
default_sequencing_type=default_assay_type,
default_sequencing=DefaultSequencing(
seq_type='genome', technology='short-read', platform='illumina'
),
search_locations=search_path,
)
for manifest in manifests:
Expand Down
Loading

0 comments on commit 9199dab

Please sign in to comment.