Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion isovar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__ = "1.4.22"
__version__ = "1.4.23"


from .allele_read import AlleleRead
Expand Down
13 changes: 2 additions & 11 deletions isovar/allele_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,8 @@ def from_locus_read(cls, locus_read):

insertion = (n_ref_bases == 0)

if insertion:
# insertions require a sequence of non-aligned bases
# followed by the subsequence reference position
for read_index in range(read_base0_start_inclusive, read_base0_end_exclusive):
# all the inserted nucleotides should *not* align to the reference
if reference_positions[read_index] is not None:
logger.debug(
"Skipping read '%s', inserted nucleotides shouldn't map to reference",
read_name)
return None

# ReadCollector may canonicalize equivalent indels to a logical query
# interval even when the aligner placed the indel elsewhere in a repeat.
nucleotides_at_variant_locus = convert_from_bytes_if_necessary(
sequence[read_base0_start_inclusive:read_base0_end_exclusive])

Expand Down
233 changes: 230 additions & 3 deletions isovar/read_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,205 @@ def __init__(
self.min_mapping_quality = min_mapping_quality
self.use_soft_clipped_bases = use_soft_clipped_bases

@staticmethod
def _previous_fully_aligned_pair_index(aligned_pairs, start_index):
"""
Find the previous aligned-pair entry which maps both a query and
reference position.
"""
while start_index >= 0:
query_pos, ref_pos, _ = aligned_pairs[start_index]
if query_pos is not None and ref_pos is not None:
return start_index
start_index -= 1
return None

@classmethod
def _iter_left_aligned_indel_events(
cls,
aligned_pairs,
base1_start,
ref,
alt,
read_start,
read_end,
previous_aligned_index,
):
"""
Yield the queried indel and each successive one-base left shift within
the local alignment.

The query interval is shifted in lockstep with the indel representation
so that downstream AlleleRead objects see the same canonical split for
every equivalent alignment of the same indel.
"""
ref = ref.upper()
alt = alt.upper()

while True:
yield base1_start, ref, alt, read_start, read_end

if previous_aligned_index is None or base1_start <= 1:
break

_, _, previous_ref_base = aligned_pairs[previous_aligned_index]
if previous_ref_base is None:
break

previous_ref_base = previous_ref_base.upper()
longer_allele = alt if len(alt) > len(ref) else ref
if len(longer_allele) == 0 or previous_ref_base != longer_allele[-1]:
break

if len(alt) > len(ref):
alt = previous_ref_base + alt[:-1]
else:
ref = previous_ref_base + ref[:-1]

base1_start -= 1
read_start -= 1
read_end -= 1
previous_aligned_index = cls._previous_fully_aligned_pair_index(
aligned_pairs,
previous_aligned_index - 1,
)

@classmethod
def _left_aligned_indel_interval_for_variant(
cls,
pysam_aligned_segment,
trimmed_base1_start,
trimmed_ref,
trimmed_alt,
):
"""
If the read contains an equivalent indel aligned to the right of the
queried variant locus, return the canonical read interval for the
left-aligned representation of that indel.
"""
is_insertion = len(trimmed_ref) == 0 and len(trimmed_alt) > 0
is_deletion = len(trimmed_alt) == 0 and len(trimmed_ref) > 0
if not (is_insertion or is_deletion):
return None

try:
aligned_pairs = pysam_aligned_segment.get_aligned_pairs(
matches_only=False,
with_seq=True,
)
except ValueError:
logger.debug(
"Skipping indel left-alignment for read '%s' because the alignment "
"does not expose reference bases (for example, missing MD tag)",
pysam_aligned_segment.query_name,
)
return None

query_sequence = pysam_aligned_segment.query_sequence
if query_sequence is None:
return None
if isinstance(query_sequence, bytes):
query_sequence = query_sequence.decode("ascii")

i = 0
while i < len(aligned_pairs):
query_pos, ref_pos, ref_base = aligned_pairs[i]

if is_insertion and query_pos is not None and ref_pos is None:
j = i
while (
j < len(aligned_pairs)
and aligned_pairs[j][0] is not None
and aligned_pairs[j][1] is None
):
j += 1

previous_aligned_index = cls._previous_fully_aligned_pair_index(
aligned_pairs,
i - 1,
)
if previous_aligned_index is not None:
previous_query_pos, previous_ref_pos, _ = aligned_pairs[
previous_aligned_index
]
inserted_sequence = query_sequence[query_pos:aligned_pairs[j - 1][0] + 1]
for (
normalized_base1_start,
normalized_ref,
normalized_alt,
normalized_read_start,
normalized_read_end,
) in cls._iter_left_aligned_indel_events(
aligned_pairs=aligned_pairs,
base1_start=previous_ref_pos + 1,
ref="",
alt=inserted_sequence,
read_start=query_pos,
read_end=aligned_pairs[j - 1][0] + 1,
previous_aligned_index=previous_aligned_index,
):
if (
normalized_base1_start == trimmed_base1_start
and normalized_ref == trimmed_ref
and normalized_alt == trimmed_alt.upper()
):
return normalized_read_start, normalized_read_end
i = j
continue

if is_deletion and query_pos is None and ref_pos is not None:
j = i
while (
j < len(aligned_pairs)
and aligned_pairs[j][0] is None
and aligned_pairs[j][1] is not None
):
j += 1

previous_aligned_index = cls._previous_fully_aligned_pair_index(
aligned_pairs,
i - 1,
)
if previous_aligned_index is not None:
previous_query_pos, _, _ = aligned_pairs[previous_aligned_index]
deleted_sequence = "".join(
aligned_pairs[k][2].upper() for k in range(i, j)
)
for (
normalized_base1_start,
normalized_ref,
normalized_alt,
normalized_read_start,
normalized_read_end,
) in cls._iter_left_aligned_indel_events(
aligned_pairs=aligned_pairs,
base1_start=ref_pos + 1,
ref=deleted_sequence,
alt="",
read_start=previous_query_pos + 1,
read_end=previous_query_pos + 1,
previous_aligned_index=previous_aligned_index,
):
if (
normalized_base1_start == trimmed_base1_start
and normalized_ref == trimmed_ref.upper()
and normalized_alt == trimmed_alt
):
return normalized_read_start, normalized_read_end
i = j
continue

i += 1
return None

def locus_read_from_pysam_aligned_segment(
self, pysam_aligned_segment, base0_start_inclusive, base0_end_exclusive
self,
pysam_aligned_segment,
base0_start_inclusive,
base0_end_exclusive,
trimmed_base1_start=None,
trimmed_ref=None,
trimmed_alt=None,
):
"""
Create LocusRead from pysam.AlignedSegment object and the start/end indices
Expand Down Expand Up @@ -159,6 +356,19 @@ def locus_read_from_pysam_aligned_segment(
if reference_interval_size < 0:
raise ValueError("Unexpected interval start after interval end")

normalized_indel_interval = None
if (
trimmed_base1_start is not None
and trimmed_ref is not None
and trimmed_alt is not None
):
normalized_indel_interval = self._left_aligned_indel_interval_for_variant(
pysam_aligned_segment=pysam_aligned_segment,
trimmed_base1_start=trimmed_base1_start,
trimmed_ref=trimmed_ref,
trimmed_alt=trimmed_alt,
)

# TODO:
# Consider how to handle variants before splice sites, where
# the bases before or after on the genome will not be mapped on the
Expand All @@ -167,7 +377,11 @@ def locus_read_from_pysam_aligned_segment(
# we have a dictionary mapping base-1 reference positions to base-0
# read indices and we need to use that to convert the reference
# half-open interval into a half-open interval on the read.
if reference_interval_size == 0:
if normalized_indel_interval is not None:
read_base0_start_inclusive, read_base0_end_exclusive = (
normalized_indel_interval
)
elif reference_interval_size == 0:
# Reference interval is between two bases but read may contain
# insertion.
#
Expand Down Expand Up @@ -301,7 +515,14 @@ def locus_read_from_pysam_aligned_segment(
)

def get_locus_reads(
self, alignment_file, chromosome, base0_start_inclusive, base0_end_exclusive
self,
alignment_file,
chromosome,
base0_start_inclusive,
base0_end_exclusive,
trimmed_base1_start=None,
trimmed_ref=None,
trimmed_alt=None,
):
"""
Create LocusRead objects for reads which overlap the given chromosome,
Expand Down Expand Up @@ -354,6 +575,9 @@ def get_locus_reads(
aligned_segment,
base0_start_inclusive=base0_start_inclusive,
base0_end_exclusive=base0_end_exclusive,
trimmed_base1_start=trimmed_base1_start,
trimmed_ref=trimmed_ref,
trimmed_alt=trimmed_alt,
)
if read is not None:
reads.append(read)
Expand Down Expand Up @@ -479,6 +703,9 @@ def locus_reads_overlapping_variant(self, alignment_file, variant, chromosome=No
chromosome=chromosome,
base0_start_inclusive=base0_start_inclusive,
base0_end_exclusive=base0_end_exclusive,
trimmed_base1_start=base1_position,
trimmed_ref=ref,
trimmed_alt=alt,
)

def allele_reads_overlapping_variant(self, variant, alignment_file):
Expand Down
64 changes: 64 additions & 0 deletions tests/test_locus_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .mock_objects import MockAlignmentFile, make_pysam_read
from .testing_helpers import assert_equal_fields, load_bam, data_path
from .common import eq_
from .genomes_for_testing import grch38


class TrackingReferencePositions(list):
Expand Down Expand Up @@ -454,3 +455,66 @@ def test_locus_read_insertion_locus_with_insertion_at_end_of_read():
assert result is not None, "Insertion at the end of a read should be kept"
eq_(result.read_base0_start_inclusive, 1)
eq_(result.read_base0_end_exclusive, 2)


def test_locus_read_left_aligns_equivalent_homopolymer_insertion():
"""
A read whose insertion is aligned to the right within a homopolymer should
still be rewritten to the canonical left-aligned read interval for the
queried variant.

Regression test for GitHub issue #79.
"""
variant = Variant(
"1",
1,
"A",
"AA",
grch38,
normalize_contig_names=False,
)
pysam_read = make_pysam_read(
seq="AAAA",
cigar="2M1I1M",
mdtag="3",
reference_start=0,
)
read_collector = ReadCollector()
reads = read_collector.locus_reads_overlapping_variant(
MockAlignmentFile(references=("1",), reads=[pysam_read]),
variant,
chromosome="1",
)
assert len(reads) == 1
eq_(reads[0].read_base0_start_inclusive, 1)
eq_(reads[0].read_base0_end_exclusive, 2)


def test_locus_read_left_aligns_equivalent_homopolymer_deletion():
"""
A deletion aligned to the right within a homopolymer should also be
rewritten to the canonical left-aligned query interval.
"""
variant = Variant(
"1",
1,
"AA",
"A",
grch38,
normalize_contig_names=False,
)
pysam_read = make_pysam_read(
seq="AAA",
cigar="2M1D1M",
mdtag="2^A1",
reference_start=0,
)
read_collector = ReadCollector()
reads = read_collector.locus_reads_overlapping_variant(
MockAlignmentFile(references=("1",), reads=[pysam_read]),
variant,
chromosome="1",
)
assert len(reads) == 1
eq_(reads[0].read_base0_start_inclusive, 1)
eq_(reads[0].read_base0_end_exclusive, 1)
Loading
Loading