From a3a59337e73d77b9697b565f71f0164e645a2982 Mon Sep 17 00:00:00 2001 From: Clint Valentine Date: Thu, 13 Jan 2022 10:11:30 -0500 Subject: [PATCH 1/3] Add an entrypoint to SamSource for bytes of SAM/BAM/CRAM --- .../fulcrumgenomics/bam/api/SamSource.scala | 36 ++++++++++++++++--- .../fulcrumgenomics/bam/api/SamIoTest.scala | 31 ++++++++++++++-- 2 files changed, 60 insertions(+), 7 deletions(-) diff --git a/src/main/scala/com/fulcrumgenomics/bam/api/SamSource.scala b/src/main/scala/com/fulcrumgenomics/bam/api/SamSource.scala index c9ea7703e..d997d07c3 100644 --- a/src/main/scala/com/fulcrumgenomics/bam/api/SamSource.scala +++ b/src/main/scala/com/fulcrumgenomics/bam/api/SamSource.scala @@ -24,13 +24,12 @@ package com.fulcrumgenomics.bam.api -import java.io.Closeable - import com.fulcrumgenomics.FgBioDef._ import com.fulcrumgenomics.bam.api.QueryType.QueryType import htsjdk.samtools._ import htsjdk.samtools.util.{Interval, Locatable} +import java.io.{Closeable, InputStream} import scala.collection.compat._ /** Companion to the [[SamSource]] class that provides factory methods for sources. */ @@ -66,6 +65,29 @@ object SamSource { index.foreach(i => input.index(i)) new SamSource(fac.open(input)) } + + /** Constructs a [[SamSource]] to read from the provided input stream. + * + * @param stream the input stream of SAM/BAM/CRAM bytes + * @param ref an optional reference sequencing for decoding CRAM files + * @param async if true use extra thread(s) to speed up reading + * @param stringency the validation stringency to apply when reading the data + * @param factory a SAMRecordFactory; MUST return classes that mix in [[SamRecord]] + */ + def apply( + stream: InputStream, + ref: Option[PathToFasta], + async: Boolean, + stringency: ValidationStringency, + factory: SAMRecordFactory, + ): SamSource = { + val fac = SamReaderFactory.make() + fac.samRecordFactory(factory) + fac.setUseAsyncIo(async) + fac.validationStringency(stringency) + ref.foreach(fasta => fac.referenceSequence(fasta.toFile)) + new SamSource(fac.open(SamInputResource.of(stream)), closer = Some(() => stream.close())) + } } /** Describes the two types of queries that can be performed. */ @@ -78,7 +100,9 @@ object QueryType extends Enumeration { * A source class for reading SAM/BAM/CRAM files and for querying them. * @param reader the underlying [[SamReader]] */ -class SamSource private(private val reader: SamReader) extends View[SamRecord] with HeaderHelper with Closeable { +class SamSource private(private val reader: SamReader, private val closer: Option[Closeable] = None) + extends View[SamRecord] with HeaderHelper with Closeable { + /** The [[htsjdk.samtools.SAMFileHeader]] associated with the source. */ override val header: SAMFileHeader = reader.getFileHeader @@ -109,7 +133,11 @@ class SamSource private(private val reader: SamReader) extends View[SamRecord] w /** Provides a string that shows where the source is reading from. */ override def toString: String = s"SamReader(${reader.getResourceDescription})" - override def close(): Unit = this.reader.close() + /** Close an optional wrapped closeable and release the SAM reader. */ + override def close(): Unit = { + this.closer.foreach(_.close()) + this.reader.close() + } /** * Returns the underlying SamReader. This should be avoided as much as possible, and the diff --git a/src/test/scala/com/fulcrumgenomics/bam/api/SamIoTest.scala b/src/test/scala/com/fulcrumgenomics/bam/api/SamIoTest.scala index c6c0fb9b1..ab4b676d8 100644 --- a/src/test/scala/com/fulcrumgenomics/bam/api/SamIoTest.scala +++ b/src/test/scala/com/fulcrumgenomics/bam/api/SamIoTest.scala @@ -24,15 +24,15 @@ package com.fulcrumgenomics.bam.api -import java.nio.file.Files -import java.util.concurrent.{Callable, Executors, TimeUnit} - import com.fulcrumgenomics.FgBioDef._ +import com.fulcrumgenomics.bam.api.SamSource.{DefaultUseAsyncIo, DefaultValidationStringency} import com.fulcrumgenomics.fasta.{SequenceDictionary, SequenceMetadata} import com.fulcrumgenomics.testing.{SamBuilder, UnitSpec} import com.fulcrumgenomics.util.Io import htsjdk.samtools.GenomicIndexUtil +import java.nio.file.Files +import java.util.concurrent.{Callable, Executors, TimeUnit} import scala.util.Random class SamIoTest extends UnitSpec { @@ -164,4 +164,29 @@ class SamIoTest extends UnitSpec { filterCount shouldBe 10 mapCount shouldBe 10 } + + it should "allow reading from a stream of SAM bytes" in { + val builder = new SamBuilder() + builder.addPair(name = "q1", start1 = 100, start2 = 300) + builder.addPair(name = "q4", start1 = 200, start2 = 400) + builder.addPair(name = "q3", start1 = 300, start2 = 500) + builder.addPair(name = "q2", start1 = 400, start2 = 600) + + val sam = makeTempFile(getClass.getSimpleName, ".sam") + val out = SamWriter(sam, builder.header, sort = Some(SamOrder.Coordinate)) + builder.foreach(out.write) + out.close() + + val source = SamSource( + stream = Io.toInputStream(sam), + ref = None, + async = DefaultUseAsyncIo, + stringency = DefaultValidationStringency, + factory = SamRecord.Factory + ) + + source.indexed shouldBe false + source.toSeq.map(_.start) should contain theSameElementsInOrderAs Seq(100, 200, 300, 300, 400, 400, 500, 600) + source.close() + } } From c08b5604868a3ba4dc20c99be27e4e1e9dd26c27 Mon Sep 17 00:00:00 2001 From: Clint Valentine Date: Thu, 13 Jan 2022 10:45:39 -0500 Subject: [PATCH 2/3] chore: fixup typos and reference in scaladoc --- .../scala/com/fulcrumgenomics/bam/api/SamSource.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/fulcrumgenomics/bam/api/SamSource.scala b/src/main/scala/com/fulcrumgenomics/bam/api/SamSource.scala index d997d07c3..22f71a2ed 100644 --- a/src/main/scala/com/fulcrumgenomics/bam/api/SamSource.scala +++ b/src/main/scala/com/fulcrumgenomics/bam/api/SamSource.scala @@ -42,10 +42,10 @@ object SamSource { * * @param path the path to read the SAM/BAM/CRAM from * @param index an optional path to read the index from - * @param ref an optional reference sequencing for decoding CRAM files + * @param ref an optional reference sequence for decoding CRAM files * @param async if true use extra thread(s) to speed up reading * @param stringency the validation stringency to apply when reading the data - * @param factory a SAMRecordFactory; MUST return classes that mix in [[SamRecord]] + * @param factory a [[SAMRecordFactory]]; MUST return classes that mix in [[SamRecord]] */ def apply(path: PathToBam, index: Option[FilePath] = None, @@ -69,10 +69,10 @@ object SamSource { /** Constructs a [[SamSource]] to read from the provided input stream. * * @param stream the input stream of SAM/BAM/CRAM bytes - * @param ref an optional reference sequencing for decoding CRAM files + * @param ref an optional reference sequence for decoding CRAM files * @param async if true use extra thread(s) to speed up reading * @param stringency the validation stringency to apply when reading the data - * @param factory a SAMRecordFactory; MUST return classes that mix in [[SamRecord]] + * @param factory a [[SAMRecordFactory]]; MUST return classes that mix in [[SamRecord]] */ def apply( stream: InputStream, From eeb9e83a1ae527feb8a33ffd0eaea8bc37793865 Mon Sep 17 00:00:00 2001 From: Clint Valentine Date: Thu, 13 Jan 2022 12:26:54 -0500 Subject: [PATCH 3/3] chore: move dupliated code to a private helper --- .../fulcrumgenomics/bam/api/SamSource.scala | 35 ++++++++++--------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/src/main/scala/com/fulcrumgenomics/bam/api/SamSource.scala b/src/main/scala/com/fulcrumgenomics/bam/api/SamSource.scala index 22f71a2ed..30c19b10e 100644 --- a/src/main/scala/com/fulcrumgenomics/bam/api/SamSource.scala +++ b/src/main/scala/com/fulcrumgenomics/bam/api/SamSource.scala @@ -37,8 +37,22 @@ object SamSource { var DefaultUseAsyncIo: Boolean = false var DefaultValidationStringency: ValidationStringency = ValidationStringency.STRICT - /** - * Constructs a [[SamSource]] to read from the provided path. + /** Configure a [[SAMRecordFactory]] with a variety of parameters. */ + private def buildSamRecordFactory( + factory: SAMRecordFactory, + ref: Option[PathToFasta], + async: Boolean, + stringency: ValidationStringency, + ): SamReaderFactory = { + val fac = SamReaderFactory.make() + fac.samRecordFactory(factory) + fac.setUseAsyncIo(async) + fac.validationStringency(stringency) + ref.foreach(fac.referenceSequence) + fac + } + + /** Constructs a [[SamSource]] to read from the provided path. * * @param path the path to read the SAM/BAM/CRAM from * @param index an optional path to read the index from @@ -53,16 +67,9 @@ object SamSource { async: Boolean = DefaultUseAsyncIo, stringency: ValidationStringency = DefaultValidationStringency, factory: SAMRecordFactory = SamRecord.Factory): SamSource = { - // Configure the factory - val fac = SamReaderFactory.make() - fac.samRecordFactory(factory) - fac.setUseAsyncIo(async) - fac.validationStringency(stringency) - ref.foreach(r => fac.referenceSequence(r.toFile)) - - // Open the input(s) + val fac = buildSamRecordFactory(factory = factory, ref = ref, async = async, stringency = stringency) val input = SamInputResource.of(path) - index.foreach(i => input.index(i)) + index.foreach(input.index) new SamSource(fac.open(input)) } @@ -81,11 +88,7 @@ object SamSource { stringency: ValidationStringency, factory: SAMRecordFactory, ): SamSource = { - val fac = SamReaderFactory.make() - fac.samRecordFactory(factory) - fac.setUseAsyncIo(async) - fac.validationStringency(stringency) - ref.foreach(fasta => fac.referenceSequence(fasta.toFile)) + val fac = buildSamRecordFactory(factory = factory, ref = ref, async = async, stringency = stringency) new SamSource(fac.open(SamInputResource.of(stream)), closer = Some(() => stream.close())) } }