Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dependencies = [
"bioutils",
"requests",
"canonicaljson",
"setuptools>=78.1.0",
]

[project.optional-dependencies]
Expand All @@ -49,6 +50,7 @@ extras = [
"hgvs>=1.4",
"dill~=0.3.7",
"click",
"pysam>=0.23.0",
]
dev = [
# tests
Expand Down
108 changes: 64 additions & 44 deletions src/ga4gh/vrs/extras/annotator/vcf.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
"""Annotate VCFs with VRS identifiers and attributes."""

import asyncio

Check failure on line 2 in src/ga4gh/vrs/extras/annotator/vcf.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (F401)

src/ga4gh/vrs/extras/annotator/vcf.py:2:8: F401 `asyncio` imported but unused
import concurrent.futures

Check failure on line 3 in src/ga4gh/vrs/extras/annotator/vcf.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (F401)

src/ga4gh/vrs/extras/annotator/vcf.py:3:8: F401 `concurrent.futures` imported but unused
import abc
import logging
import os
import pickle
from enum import Enum
from pathlib import Path
from typing import Literal
from queue import Queue
from threading import Thread



import pysam

from ga4gh.core.identifiers import (
VrsObjectIdentifierIs,
use_ga4gh_compute_identifier_when,
)
from ga4gh.vrs.dataproxy import _DataProxy
from ga4gh.vrs.extras.translator import AlleleTranslator
from ga4gh.vrs.models import Allele

Check failure on line 24 in src/ga4gh/vrs/extras/annotator/vcf.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (I001)

src/ga4gh/vrs/extras/annotator/vcf.py:2:1: I001 Import block is un-sorted or un-formatted

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -216,55 +222,69 @@
vcf_out = None

allele_collection = [] if self.collect_alleles else None
record_queue = Queue()
result_queue = Queue()
num_workers = os.cpu_count() or 4 # Adjust based on system resources

Check failure on line 228 in src/ga4gh/vrs/extras/annotator/vcf.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (W293)

src/ga4gh/vrs/extras/annotator/vcf.py:228:1: W293 Blank line contains whitespace
def worker():

Check failure on line 229 in src/ga4gh/vrs/extras/annotator/vcf.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (ANN202)

src/ga4gh/vrs/extras/annotator/vcf.py:229:13: ANN202 Missing return type annotation for private function `worker`
while True:
record = record_queue.get()
if record is None:
break
try:
additional_info_fields = [FieldName.IDS_FIELD]
if vrs_attributes:
additional_info_fields += [
FieldName.STARTS_FIELD,
FieldName.ENDS_FIELD,
FieldName.STATES_FIELD,
]
vrs_field_data = self._get_vrs_data(
record,
allele_collection,
assembly,
additional_info_fields,
vrs_attributes=vrs_attributes,
compute_for_ref=compute_for_ref,
require_validation=require_validation,
**kwargs,
)
except Exception as ex:
_logger.exception("VRS error on %s-%s", record.chrom, record.pos)
err_msg = f"{ex}" or f"{type(ex)}"
err_msg = err_msg.translate(VCF_ESCAPE_MAP)
additional_info_fields = [FieldName.ERROR_FIELD]
vrs_field_data = {FieldName.ERROR_FIELD.value: [err_msg]}
result_queue.put((record, vrs_field_data))
record_queue.task_done()

Check failure on line 260 in src/ga4gh/vrs/extras/annotator/vcf.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (W293)

src/ga4gh/vrs/extras/annotator/vcf.py:260:1: W293 Blank line contains whitespace
workers = [Thread(target=worker) for _ in range(num_workers)]
for w in workers:
w.start()

Check failure on line 264 in src/ga4gh/vrs/extras/annotator/vcf.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (W293)

src/ga4gh/vrs/extras/annotator/vcf.py:264:1: W293 Blank line contains whitespace
for record in vcf:
if vcf_out:
additional_info_fields = [FieldName.IDS_FIELD]
if vrs_attributes:
additional_info_fields += [
FieldName.STARTS_FIELD,
FieldName.ENDS_FIELD,
FieldName.STATES_FIELD,
]
else:
# no INFO field names need to be designated if not producing an annotated VCF
additional_info_fields = []
try:
vrs_field_data = self._get_vrs_data(
record,
allele_collection,
assembly,
additional_info_fields,
vrs_attributes=vrs_attributes,
compute_for_ref=compute_for_ref,
require_validation=require_validation,
**kwargs,
)
except Exception as ex:
_logger.exception("VRS error on %s-%s", record.chrom, record.pos)
err_msg = f"{ex}" or f"{type(ex)}"
err_msg = err_msg.translate(VCF_ESCAPE_MAP)
additional_info_fields = [FieldName.ERROR_FIELD]
vrs_field_data = {FieldName.ERROR_FIELD.value: [err_msg]}

_logger.debug(
"VCF record %s-%s generated vrs_field_data %s",
record.chrom,
record.pos,
vrs_field_data,
)

record_queue.put(record)

Check failure on line 267 in src/ga4gh/vrs/extras/annotator/vcf.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (W293)

src/ga4gh/vrs/extras/annotator/vcf.py:267:1: W293 Blank line contains whitespace
record_queue.join()

Check failure on line 269 in src/ga4gh/vrs/extras/annotator/vcf.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (W293)

src/ga4gh/vrs/extras/annotator/vcf.py:269:1: W293 Blank line contains whitespace
for _ in range(num_workers):
record_queue.put(None)
for w in workers:
w.join()

Check failure on line 274 in src/ga4gh/vrs/extras/annotator/vcf.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (W293)

src/ga4gh/vrs/extras/annotator/vcf.py:274:1: W293 Blank line contains whitespace
while not result_queue.empty():
record, vrs_field_data = result_queue.get()
if output_vcf_path and vcf_out:
for k in additional_info_fields:
record.info[k.value] = [
value or k.default_value() for value in vrs_field_data[k.value]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part I wasn't sure about

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@quinnwai FYI - can you take a look?

for k in vrs_field_data:
record.info[k] = [
value for value in vrs_field_data[k]
]
vcf_out.write(record)

vcf.close()

vcf_out.write(record)

if vcf_out:
vcf_out.close()

if self.collect_alleles:
self.on_vrs_object_collection(allele_collection, **kwargs)

Expand Down
Loading