From a37732955173beec5196eec986332829b2b2ecf0 Mon Sep 17 00:00:00 2001 From: Dave Lawrence Date: Tue, 14 Sep 2021 17:52:21 +0930 Subject: [PATCH] #480 - store API responses so we can check lengths --- genes/annotation_consortium_api.py | 73 ------ genes/hgvs.py | 2 +- .../import_refseq_transcript_fasta.py | 59 +++++ ...fo_transcriptversioninfofastafileimport.py | 47 ++++ genes/models.py | 218 ++++++++++++++++-- 5 files changed, 312 insertions(+), 87 deletions(-) delete mode 100644 genes/annotation_consortium_api.py create mode 100644 genes/management/commands/import_refseq_transcript_fasta.py create mode 100644 genes/migrations/0039_transcriptversioninfo_transcriptversioninfofastafileimport.py diff --git a/genes/annotation_consortium_api.py b/genes/annotation_consortium_api.py deleted file mode 100644 index 2038c57ae..000000000 --- a/genes/annotation_consortium_api.py +++ /dev/null @@ -1,73 +0,0 @@ -from urllib.error import HTTPError, URLError - -import requests -from Bio import Entrez -from django.core.cache import cache -from requests import RequestException - -from genes.models_enums import AnnotationConsortium -from library.constants import WEEK_SECS, HOUR_SECS -from snpdb.models.models_genome import GenomeBuild - - -def transcript_exists(genome_build: GenomeBuild, identifier, version) -> bool: - """ Throws exception if we can't tell """ - - transcript_exists_key = f"transcript_exists:{identifier}.{version}_{genome_build}" - transcript_connection_error_key = "error_" + transcript_exists_key - if cache.get(transcript_connection_error_key): - raise ConnectionError("Already failed within last hour") - - try: - # Cache results - true permanently, false for a week and connection error for 1 hour - exists = cache.get(transcript_exists_key) - if exists is None: - if AnnotationConsortium.get_from_transcript_accession(identifier) == AnnotationConsortium.ENSEMBL: - exists = ensembl_transcript_exists(genome_build, identifier, version) - else: - exists = refseq_transcript_exists(identifier, version) - if exists: - timeout = None # Forever - else: - timeout = WEEK_SECS - cache.set(transcript_exists_key, exists, timeout=timeout) - return exists - except (RequestException, URLError) as e: - cache.set(transcript_connection_error_key, True, timeout=HOUR_SECS) - raise ConnectionError from e - - -def ensembl_transcript_exists(genome_build: GenomeBuild, identifier, version=None): - """ If version is passed, check that it's within range of 1 <= version <= latest_version """ - if genome_build.name == 'GRCh37': - api_base_url = "https://grch37.rest.ensembl.org" - else: - api_base_url = "https://rest.ensembl.org" - url = f"{api_base_url}/lookup/id/{identifier}" - r = requests.get(url, headers={"Content-Type": "application/json"}) - data = r.json() - - if not r.ok: - error = data.get("error") - if error: - if "not found" in error: - return False - raise ValueError(f"Unknown Ensembl API error response: '{error}'") - - if version: - version = int(version) - latest_version = int(data["version"]) - return 1 <= version <= latest_version - return True - - -def refseq_transcript_exists(identifier, version=None): - """ No way to tell whether it exists for a particular build or not """ - try: - accession = identifier - if version: - accession += "." + str(version) - Entrez.efetch(db='nuccore', id=accession, retmode='text') - return True - except HTTPError: - return False diff --git a/genes/hgvs.py b/genes/hgvs.py index da2e29483..a83ca2a89 100644 --- a/genes/hgvs.py +++ b/genes/hgvs.py @@ -756,7 +756,7 @@ def _variant_to_hgvs(self, variant: Variant, transcript_name=None) -> Tuple[HGVS raise ValueError(f"Could not convert {variant} to HGVS - tried: {attempts}") else: # No methods tried, mustn't have had any transcripts - raise TranscriptVersion.raise_bad_or_missing_transcript(self.genome_build, transcript_name) + raise TranscriptVersion.raise_bad_or_missing_transcript(transcript_name) else: hgvs_name = pyhgvs.variant_to_hgvs_name(chrom, offset, ref, alt, self.genome_build.genome_fasta.fasta, transcript=None, max_allele_length=sys.maxsize) diff --git a/genes/management/commands/import_refseq_transcript_fasta.py b/genes/management/commands/import_refseq_transcript_fasta.py new file mode 100644 index 000000000..75a537b1c --- /dev/null +++ b/genes/management/commands/import_refseq_transcript_fasta.py @@ -0,0 +1,59 @@ +import logging +from collections import defaultdict + +from Bio import SeqIO +from django.core.management import BaseCommand +from django.db.models import QuerySet + +from genes.models import TranscriptVersionInfo, TranscriptVersionInfoFastaFileImport, TranscriptVersion, Transcript +from genes.models_enums import AnnotationConsortium +from library.file_utils import file_md5sum, open_handle_gzip + + + + +class Command(BaseCommand): + + def add_arguments(self, parser): + parser.add_argument('--overwrite', action='store_true', help='Delete and replace fasta import with same md5sum') + parser.add_argument('filename') + + def handle(self, *args, **options): + filename = options["filename"] + overwrite = options["overwrite"] + + md5_hash = file_md5sum(filename) + if existing_import := TranscriptVersionInfoFastaFileImport.objects.filter(md5_hash=md5_hash).first(): + if overwrite: + print(f"Deleting existing TranscriptVersionInfos for fasta import {md5_hash}") + existing_import.delete() + else: + raise ValueError(f"Fasta import {md5_hash} exists, use --overwrite to delete old data") + + known_transcripts = set(Transcript.objects.all().values_list("identifier", flat=True)) + if not known_transcripts: + raise ValueError("No transcripts! Insert them first!") + + fasta_import = TranscriptVersionInfoFastaFileImport.objects.create(md5_hash=md5_hash, + annotation_consortium=AnnotationConsortium.REFSEQ, + filename=filename) + skipped_transcripts = 0 + records = [] + with open_handle_gzip(filename, "rt") as f: + for record in SeqIO.parse(f, "fasta"): + transcript_id, version = TranscriptVersion.get_transcript_id_and_version(record.id) + if transcript_id not in known_transcripts: + skipped_transcripts += 1 + continue + + tvi = TranscriptVersionInfo(transcript_id=transcript_id, version=version, + fasta_import=fasta_import, + sequence=str(record.seq), length=len(record.seq)) + records.append(tvi) + + print(f"Skipped {skipped_transcripts} transcripts not in our database") + if num_records := len(records): + print(f"Inserting {num_records} TranscriptVersionInfo records") + TranscriptVersionInfo.objects.bulk_create(records, ignore_conflicts=True, batch_size=2000) + + TranscriptVersionInfo.set_transcript_version_alignment_gap_if_length_different(records) diff --git a/genes/migrations/0039_transcriptversioninfo_transcriptversioninfofastafileimport.py b/genes/migrations/0039_transcriptversioninfo_transcriptversioninfofastafileimport.py new file mode 100644 index 000000000..4e66cacb8 --- /dev/null +++ b/genes/migrations/0039_transcriptversioninfo_transcriptversioninfofastafileimport.py @@ -0,0 +1,47 @@ +# Generated by Django 3.2.6 on 2021-09-14 02:01 + +from django.db import migrations, models +import django.db.models.deletion +import django_extensions.db.fields + + +class Migration(migrations.Migration): + + dependencies = [ + ('genes', '0038_lrgrefseqgene'), + ] + + operations = [ + migrations.CreateModel( + name='TranscriptVersionInfoFastaFileImport', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created', django_extensions.db.fields.CreationDateTimeField(auto_now_add=True, verbose_name='created')), + ('modified', django_extensions.db.fields.ModificationDateTimeField(auto_now=True, verbose_name='modified')), + ('md5_hash', models.CharField(max_length=32, unique=True)), + ('annotation_consortium', models.CharField(choices=[('R', 'RefSeq'), ('E', 'Ensembl')], max_length=1)), + ('filename', models.TextField()), + ], + options={ + 'get_latest_by': 'modified', + 'abstract': False, + }, + ), + migrations.CreateModel( + name='TranscriptVersionInfo', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created', django_extensions.db.fields.CreationDateTimeField(auto_now_add=True, verbose_name='created')), + ('modified', django_extensions.db.fields.ModificationDateTimeField(auto_now=True, verbose_name='modified')), + ('version', models.IntegerField()), + ('api_response', models.TextField(null=True)), + ('sequence', models.TextField()), + ('length', models.IntegerField()), + ('fasta_import', models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, to='genes.transcriptversioninfofastafileimport')), + ('transcript', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='genes.transcript')), + ], + options={ + 'unique_together': {('transcript', 'version')}, + }, + ), + ] diff --git a/genes/models.py b/genes/models.py index d6eed4fa0..5cac53900 100644 --- a/genes/models.py +++ b/genes/models.py @@ -8,13 +8,17 @@ from dataclasses import dataclass from datetime import timedelta from functools import total_ordering -from typing import Tuple, Optional, Dict, List, Set, Union -from urllib.error import URLError +from io import StringIO +from typing import Tuple, Optional, Dict, List, Set, Union, Iterable +from urllib.error import URLError, HTTPError +import requests +from Bio import Entrez, SeqIO from django.conf import settings from django.contrib.auth.models import User from django.contrib.postgres.aggregates import StringAgg from django.contrib.postgres.fields import CITextField +from django.core.cache import cache from django.core.exceptions import PermissionDenied, ObjectDoesNotExist, MultipleObjectsReturned from django.db import models, IntegrityError, transaction from django.db.models import Min, Max, QuerySet, TextField @@ -31,16 +35,17 @@ from django_extensions.db.models import TimeStampedModel from guardian.shortcuts import get_objects_for_user from lazy import lazy +from requests import RequestException -from genes.annotation_consortium_api import transcript_exists from genes.gene_coverage import load_gene_coverage_df from genes.models_enums import AnnotationConsortium, HGNCStatus, GeneSymbolAliasSource +from library.constants import HOUR_SECS, WEEK_SECS from library.django_utils import SortByPKMixin from library.django_utils.django_partition import RelatedModelsPartitionModel from library.file_utils import mk_path from library.guardian_utils import assign_permission_to_user_and_groups, DjangoPermission from library.log_utils import log_traceback -from library.utils import empty_dict +from library.utils import empty_dict, get_single_element, iter_fixed_chunks from snpdb.models import Wiki, Company, Sample, DataState from snpdb.models.models_enums import ImportStatus from snpdb.models.models_genome import GenomeBuild @@ -547,6 +552,9 @@ class TranscriptVersion(SortByPKMixin, models.Model): Ensembl ID info: https://m.ensembl.org/Help/Faq?id=488 + There's currently multiple TranscriptVersion per genome build, this should probably be changed to only having + 1, merging in TranscriptVersionInfo and moving the data (which contains exons etc) into a related object + A useful query to get the latest version for each transcript is: qs.order_by("transcript_id", "-version").distinct("transcript_id") """ @@ -687,17 +695,27 @@ def filter_best_transcripts_by_accession(genome_build: GenomeBuild, transcript_a return TranscriptVersion.objects.filter(**kwargs).order_by(order_by) @staticmethod - def raise_bad_or_missing_transcript(genome_build: GenomeBuild, transcript_accession): + def raise_bad_or_missing_transcript(transcript_accession): """ Checks whether a transcript we can't match is wrong (their fault) or we don't have it (our fault) """ annotation_consortium = AnnotationConsortium.get_from_transcript_accession(transcript_accession).label - try: - identifier, version = TranscriptVersion.get_transcript_id_and_version(transcript_accession) - if transcript_exists(genome_build, identifier, version): - raise MissingTranscript(f"Transcript '{transcript_accession}' valid but missing from our (build: {genome_build}) database.") - raise BadTranscript(f"Transcript '{transcript_accession}' does not exist in the {genome_build} {annotation_consortium} database.") - except ConnectionError: - raise NoTranscript(f"Transcript '{transcript_accession}' missing from our DB - validity with {annotation_consortium} unknown") + key_base = f"transcript_exists:{transcript_accession}" + transcript_connection_error_key = key_base + ":ERROR" + if not cache.get(transcript_connection_error_key): + bad_transcript_key = key_base + "BAD" + if message := cache.get(bad_transcript_key): + raise BadTranscript(message) + try: + TranscriptVersionInfo.get(transcript_accession) # Throws BadTranscript + raise MissingTranscript(f"Transcript '{transcript_accession}' valid but missing from our database.") + except BadTranscript as bt: + # Only cache if we don't have it (DB will have it if we do) + cache.set(bad_transcript_key, str(bt), timeout=WEEK_SECS) + raise + except (RequestException, URLError) as e: + cache.set(transcript_connection_error_key, True, timeout=HOUR_SECS) + + raise NoTranscript(f"Transcript '{transcript_accession}' missing from our DB - validity with {annotation_consortium} unknown") @staticmethod def get_transcript_version(genome_build: GenomeBuild, transcript_name, best_attempt=True) -> Optional['TranscriptVersion']: @@ -729,7 +747,7 @@ def get_transcript_version(genome_build: GenomeBuild, transcript_name, best_atte transcript_version = transcript_versions_qs.last() if transcript_version is None: - TranscriptVersion.raise_bad_or_missing_transcript(genome_build, transcript_name) + TranscriptVersion.raise_bad_or_missing_transcript(transcript_name) if 'id' not in transcript_version.data: # only going to happen if we have legacy data in the database, transcripts that use the default for data {} @@ -861,10 +879,184 @@ def get_absolute_url(self): def get_external_url(self): return self.transcript.get_external_url(self.genome_build) + f".{self.version}" + @staticmethod + def update_accessions(accessions: Iterable[str], genome_build=None, **update_kwargs): + """ A way to quickly update lots of accessions, relying on the fact that there aren't that many versions """ + + tv_by_version = defaultdict(set) + for transcript_accession in accessions: + transcript_id, version = TranscriptVersion.get_transcript_id_and_version(transcript_accession) + tv_by_version[version].add(transcript_id) + + for version, transcripts in tv_by_version.items(): + filter_kwargs = {"version": version, "transcript_id__in": transcripts} + if genome_build: + filter_kwargs["genome_build"] = genome_build + TranscriptVersion.objects.filter(**filter_kwargs).update(**update_kwargs) + def __str__(self): return f"{self.accession} ({self.gene_version.gene_symbol}/{self.genome_build.name})" +class TranscriptVersionInfoFastaFileImport(TimeStampedModel): + md5_hash = models.CharField(max_length=32, unique=True) + annotation_consortium = models.CharField(max_length=1, choices=AnnotationConsortium.choices) + filename = models.TextField() + + +class TranscriptVersionInfo(TimeStampedModel): + """ Current main use of this is to download transcript version lengths from the web, and then + set TranscriptVersion.alignment_gap = True if they don't match + + Lengths not matching means there is a gap, but we can't be sure there isn't a gap """ + transcript = models.ForeignKey(Transcript, on_delete=CASCADE) + version = models.IntegerField() + # Data from Fasta file will have this set, from API will be populated with api_response + fasta_import = models.ForeignKey(TranscriptVersionInfoFastaFileImport, null=True, on_delete=CASCADE) + api_response = models.TextField(null=True) + sequence = models.TextField() + length = models.IntegerField() + + class Meta: + unique_together = ("transcript", "version") + + def __str__(self): + return f"{self.accession} ({self.length}bp)" + + @lazy + def accession(self): + return TranscriptVersion.get_accession(self.transcript_id, self.version) + + @staticmethod + def get(transcript_accession: str) -> 'TranscriptVersionInfo': + """ Returns DB copy if we have it, or retrieves + stores from API """ + + transcript_id, version = TranscriptVersion.get_transcript_id_and_version(transcript_accession) + if tvi := TranscriptVersionInfo.objects.filter(transcript_id=transcript_id, version=version).first(): + return tvi + + annotation_consortium = AnnotationConsortium.get_from_transcript_accession(transcript_accession) + if annotation_consortium == AnnotationConsortium.REFSEQ: + tvi = TranscriptVersionInfo._get_and_store_from_refseq_api(transcript_accession) + else: + tvi = TranscriptVersionInfo._get_and_store_from_ensembl_api(transcript_accession) + TranscriptVersionInfo.set_transcript_version_alignment_gap_if_length_different([tvi]) + return tvi + + @staticmethod + def _get_kwargs_from_genbank_record(record): + transcript_id, version = TranscriptVersion.get_transcript_id_and_version(record.id) + sequence = str(record.seq) + length = len(sequence) + return {"transcript_id": transcript_id, + "version": version, + "sequence": sequence, + "length": length} + + @staticmethod + def _get_and_store_from_refseq_api(transcript_accession): + try: + data = Entrez.efetch(db='nuccore', id=transcript_accession, rettype='gb', retmode='text') + except HTTPError as e: + if e.code == 400: + raise BadTranscript(f"Entrez error for '{transcript_accession}': {e}") + raise e + api_response = data.read() + with StringIO(api_response) as f: + records = list(SeqIO.parse(f, "genbank")) + record = get_single_element(records) + kwargs = TranscriptVersionInfo._get_kwargs_from_genbank_record(record) + return TranscriptVersionInfo.objects.create(**kwargs, api_response=api_response) + + @staticmethod + def _get_and_store_from_ensembl_api(transcript_accession): + transcript_id, version = TranscriptVersion.get_transcript_id_and_version(transcript_accession) + url = f"https://rest.ensembl.org/sequence/id/{transcript_id}/{version}?type=cdna" + r = requests.get(url, headers={"Content-Type": "application/json"}) + data = r.json() + + if r.ok: + transcript, _ = Transcript.objects.get_or_create(identifier=data["id"], + annotation_consortium=AnnotationConsortium.ENSEMBL) + return TranscriptVersionInfo.objects.create(transcript=transcript, + version=data["version"], + api_response=r.text, + sequence=data["seq"], + length=len(data["seq"])) + else: + error = data.get("error") + if error: + if "not found" in error: + raise BadTranscript(f"Ensembl API error response: '{error}'") + raise NoTranscript(f"Unable to understand Ensembl API response: {data}") + + @staticmethod + def get_refseq_transcript_versions(transcript_accessions: List[str]) -> Dict[str, 'TranscriptVersionInfo']: + """ Batch method - returns DB copies if we have it, retrieves + stores from API """ + # Find the ones we already have so we don't need to re-retrieve + all_transcript_accessions = set(transcript_accessions) + transcript_ids = [TranscriptVersion.get_transcript_id_and_version(a)[0] for a in transcript_accessions] + tvi_by_id = {} + for tvi in TranscriptVersionInfo.objects.filter(transcript_id__in=transcript_ids): + if tvi.accession in all_transcript_accessions: + tvi_by_id[tvi.accession] = tvi + + unknown_accessions = all_transcript_accessions - set(tvi_by_id) + ENTREZ_BATCH_SIZE = 500 + + for id_list in iter_fixed_chunks(unknown_accessions, ENTREZ_BATCH_SIZE): + id_param = ",".join(id_list) + search_results = Entrez.read(Entrez.epost("nuccore", id=id_param)) + fetch_handle = Entrez.efetch( + db="nuccore", + rettype="gb", + retmode="text", + webenv=search_results["WebEnv"], + query_key=search_results["QueryKey"], + idtype="acc", + ) + new_records = [] + for record in SeqIO.parse(fetch_handle, "genbank"): + # Store raw data so that we can retrieve more stuff from it later + s = StringIO() + SeqIO.write(record, s, "genbank") + s.seek(0) + api_response = s.read() + kwargs = TranscriptVersionInfo._get_kwargs_from_genbank_record(record) + tvi = TranscriptVersionInfo(**kwargs, api_response=api_response) + new_records.append(tvi) + + # Write them as we go so any failure only loses some + if new_records: + TranscriptVersionInfo.objects.bulk_create(new_records, ignore_conflicts=True, batch_size=2000) + for tvi in new_records: + tvi_by_id[tvi.accession] = tvi + TranscriptVersionInfo.set_transcript_version_alignment_gap_if_length_different(new_records) + + return tvi_by_id + + @staticmethod + def set_transcript_version_alignment_gap_if_length_different(tvis: Iterable['TranscriptVersionInfo']): + transcript_version_lengths = defaultdict(dict) + for tvi in tvis: + transcript_version_lengths[tvi.transcript_id][tvi.version] = tvi.length + + accessions_by_build_with_alignment_gap = defaultdict(list) + for tv in TranscriptVersion.objects.filter(alignment_gap=False, + transcript__in=transcript_version_lengths.keys()): + if known_length := transcript_version_lengths.get(tv.transcript_id, {}).get(tv.version): + if calc_length := tv.length: + if known_length != calc_length: + tv.alignment_gap = True + accessions_by_build_with_alignment_gap[tv.genome_build].append(tv.accession) + + for genome_build, accessions_with_alignment_gap in accessions_by_build_with_alignment_gap.items(): + logging.info("Setting %d records for %s with alignment_gap=True", + len(accessions_with_alignment_gap), genome_build) + TranscriptVersion.update_accessions(accessions_with_alignment_gap, + genome_build=genome_build, alignment_gap=True) + + class LRGRefSeqGene(models.Model): """ A Locus Reference Genomic (LRG) is a manually curated record that contains stable and thus, un-versioned reference sequences designed specifically for reporting sequence variants with clinical implications.